Coverage Report

Created: 2026-05-30 06:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/htslib/bgzf.c
Line
Count
Source
1
/* The MIT License
2
3
   Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4
                 2011, 2012 Attractive Chaos <attractor@live.co.uk>
5
   Copyright (C) 2009, 2013-2025 Genome Research Ltd
6
7
   Permission is hereby granted, free of charge, to any person obtaining a copy
8
   of this software and associated documentation files (the "Software"), to deal
9
   in the Software without restriction, including without limitation the rights
10
   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11
   copies of the Software, and to permit persons to whom the Software is
12
   furnished to do so, subject to the following conditions:
13
14
   The above copyright notice and this permission notice shall be included in
15
   all copies or substantial portions of the Software.
16
17
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19
   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20
   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21
   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22
   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23
   THE SOFTWARE.
24
*/
25
26
#define HTS_BUILDING_LIBRARY // Enables HTSLIB_EXPORT, see htslib/hts_defs.h
27
#include <config.h>
28
29
#include <stdio.h>
30
#include <stdlib.h>
31
#include <string.h>
32
#include <errno.h>
33
#include <unistd.h>
34
#include <assert.h>
35
#include <pthread.h>
36
#include <sys/types.h>
37
#include <inttypes.h>
38
#include <zlib.h>
39
40
#ifdef HAVE_LIBDEFLATE
41
#include <libdeflate.h>
42
#endif
43
44
#include "htslib/hts.h"
45
#include "htslib/bgzf.h"
46
#include "htslib/hfile.h"
47
#include "htslib/thread_pool.h"
48
#include "htslib/hts_alloc.h"
49
#include "htslib/hts_endian.h"
50
#include "cram/pooled_alloc.h"
51
#include "hts_internal.h"
52
#include "bgzf_internal.h"
53
#include "htslib/khash.h"
54
55
#ifndef EFTYPE
56
49
#define EFTYPE ENOEXEC
57
#endif
58
59
#define BGZF_MT
60
61
135k
#define BLOCK_HEADER_LENGTH 18
62
67.4k
#define BLOCK_FOOTER_LENGTH 8
63
64
65
/* BGZF/GZIP header (specialized from RFC 1952; little endian):
66
 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
67
 | 31|139|  8|  4|              0|  0|255|      6| 66| 67|      2|BLK_LEN|
68
 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
69
  BGZF extension:
70
                ^                              ^   ^   ^
71
                |                              |   |   |
72
               FLG.EXTRA                     XLEN  B   C
73
74
  BGZF format is compatible with GZIP. It limits the size of each compressed
75
  block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
76
  records the size.
77
78
*/
79
static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
80
81
typedef struct {
82
    int size;
83
    uint8_t *block;
84
    int64_t end_offset;
85
} cache_t;
86
87
KHASH_MAP_INIT_INT64(bgzf_cache, cache_t)
88
89
// struct bgzf_cache_t is defined in bgzf_internal.h
90
91
#ifdef BGZF_MT
92
93
typedef struct bgzf_job {
94
    BGZF *fp;
95
    unsigned char comp_data[BGZF_MAX_BLOCK_SIZE];
96
    size_t comp_len;
97
    unsigned char uncomp_data[BGZF_MAX_BLOCK_SIZE];
98
    size_t uncomp_len;
99
    int errcode;
100
    int64_t block_address;
101
    int hit_eof;
102
} bgzf_job;
103
104
enum mtaux_cmd {
105
    NONE = 0,
106
    SEEK,
107
    SEEK_DONE,
108
    HAS_EOF,
109
    HAS_EOF_DONE,
110
    CLOSE,
111
};
112
113
// When multi-threaded bgzf_tell won't work, so we delay the hts_idx_push
114
// until we've written the last block.
115
typedef struct {
116
    hts_pos_t beg, end;
117
    int tid, is_mapped;  // args for hts_idx_push
118
    uint64_t offset, block_number;
119
} hts_idx_cache_entry;
120
121
typedef struct {
122
    int nentries, mentries; // used and allocated
123
    hts_idx_cache_entry *e; // hts_idx elements
124
} hts_idx_cache_t;
125
126
typedef struct bgzf_mtaux_t {
127
    // Memory pool for bgzf_job structs, to avoid many malloc/free
128
    pool_alloc_t *job_pool;
129
    bgzf_job *curr_job;
130
131
    // Thread pool
132
    int n_threads;
133
    int own_pool;
134
    hts_tpool *pool;
135
136
    // Output queue holding completed bgzf_jobs
137
    hts_tpool_process *out_queue;
138
139
    // I/O thread.
140
    pthread_t io_task;
141
    pthread_mutex_t job_pool_m;
142
    int jobs_pending; // number of jobs waiting
143
    int flush_pending;
144
    void *free_block;
145
    int hit_eof;  // r/w entirely within main thread
146
147
    // Message passing to the reader thread; eg seek requests
148
    int errcode;
149
    uint64_t block_address;
150
    int eof;
151
    pthread_mutex_t command_m; // Set whenever fp is being updated
152
    pthread_cond_t command_c;
153
    enum mtaux_cmd command;
154
155
    // For multi-threaded on-the-fly indexing. See bgzf_idx_push below.
156
    pthread_mutex_t idx_m;
157
    hts_idx_t *hts_idx;
158
    uint64_t block_number, block_written;
159
    hts_idx_cache_t idx_cache;
160
} mtaux_t;
161
#endif
162
163
typedef struct
164
{
165
    uint64_t uaddr;  // offset w.r.t. uncompressed data
166
    uint64_t caddr;  // offset w.r.t. compressed data
167
}
168
bgzidx1_t;
169
170
struct bgzidx_t
171
{
172
    int noffs, moffs;       // the size of the index, n:used, m:allocated
173
    bgzidx1_t *offs;        // offsets
174
    uint64_t ublock_addr;   // offset of the current block (uncompressed data)
175
};
176
177
/*
178
 * Buffers up arguments to hts_idx_push for later use, once we've written all bar
179
 * this block.  This is necessary when multiple blocks are in flight (threading)
180
 * and fp->block_address isn't known at the time of call as we have in-flight
181
 * blocks that haven't yet been compressed.
182
 *
183
 * NB: this only matters when we're indexing on the fly (writing).
184
 * Normal indexing is threaded reads, but we already know block sizes
185
 * so it's a simpler process
186
 *
187
 * Returns 0 on success,
188
 *        -1 on failure
189
 */
190
0
int bgzf_idx_push(BGZF *fp, hts_idx_t *hidx, int tid, hts_pos_t beg, hts_pos_t end, uint64_t offset, int is_mapped) {
191
0
    hts_idx_cache_entry *e;
192
0
    mtaux_t *mt = fp->mt;
193
194
0
    if (!mt)
195
0
        return hts_idx_push(hidx, tid, beg, end, offset, is_mapped);
196
197
    // Early check for out of range positions which would fail in hts_idx_push()
198
0
    if (hts_idx_check_range(hidx, tid, beg, end) < 0)
199
0
        return -1;
200
201
0
    pthread_mutex_lock(&mt->idx_m);
202
203
0
    mt->hts_idx = hidx;
204
0
    hts_idx_cache_t *ic = &mt->idx_cache;
205
206
0
    if (ic->nentries >= ic->mentries) {
207
0
        int new_sz = ic->mentries ? ic->mentries*2 : 1024;
208
0
        if (!(e = hts_realloc_p(ic->e, sizeof(*ic->e), new_sz))) {
209
0
            pthread_mutex_unlock(&mt->idx_m);
210
0
            return -1;
211
0
        }
212
0
        ic->e = e;
213
0
        ic->mentries = new_sz;
214
0
    }
215
216
0
    e = &ic->e[ic->nentries++];
217
0
    e->tid = tid;
218
0
    e->beg = beg;
219
0
    e->end = end;
220
0
    e->is_mapped = is_mapped;
221
0
    e->offset = offset & 0xffff;
222
0
    e->block_number = mt->block_number;
223
224
0
    pthread_mutex_unlock(&mt->idx_m);
225
226
0
    return 0;
227
0
}
228
229
static int bgzf_idx_flush(BGZF *fp,
230
0
                          size_t block_uncomp_len, size_t block_comp_len) {
231
0
    mtaux_t *mt = fp->mt;
232
233
0
    if (!mt->idx_cache.e) {
234
0
        mt->block_written++;
235
0
        return 0;
236
0
    }
237
238
0
    pthread_mutex_lock(&mt->idx_m);
239
240
0
    hts_idx_cache_entry *e = mt->idx_cache.e;
241
0
    int i;
242
243
0
    assert(mt->idx_cache.nentries == 0 || mt->block_written <= e[0].block_number);
244
245
0
    for (i = 0; i < mt->idx_cache.nentries && e[i].block_number == mt->block_written; i++) {
246
0
        if (block_uncomp_len > 0 && e[i].offset == block_uncomp_len) {
247
            /*
248
             * If the virtual offset is at the end of the current block,
249
             * adjust it to point to the start of the next one.  This
250
             * is needed when on-the-fly indexing has recorded a virtual
251
             * offset just before a new block has been started, and makes
252
             * on-the-fly and standard indexing give exactly the same results.
253
             *
254
             * In theory the two virtual offsets are equivalent, but pointing
255
             * to the end of a block is inefficient, and caused problems with
256
             * versions of HTSlib before 1.11 where bgzf_read() would
257
             * incorrectly return EOF.
258
             */
259
260
            // Assert that this is the last entry for the current block_number
261
0
            assert(i == mt->idx_cache.nentries - 1
262
0
                   || e[i].block_number < e[i + 1].block_number);
263
264
            // Work out where the next block starts.  For this entry, the
265
            // offset will be zero.
266
0
            uint64_t next_block_addr = mt->block_address + block_comp_len;
267
0
            if (hts_idx_push(mt->hts_idx, e[i].tid, e[i].beg, e[i].end,
268
0
                             next_block_addr << 16, e[i].is_mapped) < 0) {
269
0
                pthread_mutex_unlock(&mt->idx_m);
270
0
                return -1;
271
0
            }
272
            // Count this entry and drop out of the loop
273
0
            i++;
274
0
            break;
275
0
        }
276
277
0
        if (hts_idx_push(mt->hts_idx, e[i].tid, e[i].beg, e[i].end,
278
0
                         (mt->block_address << 16) + e[i].offset,
279
0
                         e[i].is_mapped) < 0) {
280
0
            pthread_mutex_unlock(&mt->idx_m);
281
0
            return -1;
282
0
        }
283
0
    }
284
285
0
    memmove(&e[0], &e[i], (mt->idx_cache.nentries - i) * sizeof(*e));
286
0
    mt->idx_cache.nentries -= i;
287
0
    mt->block_written++;
288
289
0
    pthread_mutex_unlock(&mt->idx_m);
290
0
    return 0;
291
0
}
292
293
void bgzf_index_destroy(BGZF *fp);
294
int bgzf_index_add_block(BGZF *fp);
295
static int mt_destroy(mtaux_t *mt);
296
297
static inline void packInt16(uint8_t *buffer, uint16_t value)
298
33.7k
{
299
33.7k
    buffer[0] = value;
300
33.7k
    buffer[1] = value >> 8;
301
33.7k
}
302
303
static inline int unpackInt16(const uint8_t *buffer)
304
1.57k
{
305
1.57k
    return buffer[0] | buffer[1] << 8;
306
1.57k
}
307
308
static inline void packInt32(uint8_t *buffer, uint32_t value)
309
67.4k
{
310
67.4k
    buffer[0] = value;
311
67.4k
    buffer[1] = value >> 8;
312
67.4k
    buffer[2] = value >> 16;
313
67.4k
    buffer[3] = value >> 24;
314
67.4k
}
315
316
static void razf_info(hFILE *hfp, const char *filename)
317
49
{
318
49
    uint64_t usize, csize;
319
49
    off_t sizes_pos;
320
321
49
    if (filename == NULL || strcmp(filename, "-") == 0) filename = "FILE";
322
323
    // RAZF files end with USIZE,CSIZE stored as big-endian uint64_t
324
49
    if ((sizes_pos = hseek(hfp, -16, SEEK_END)) < 0) goto no_sizes;
325
49
    if (hread(hfp, &usize, 8) != 8 || hread(hfp, &csize, 8) != 8) goto no_sizes;
326
49
    if (!ed_is_big()) ed_swap_8p(&usize), ed_swap_8p(&csize);
327
49
    if (csize >= sizes_pos) goto no_sizes; // Very basic validity check
328
329
1
    hts_log_error(
330
1
"To decompress this file, use the following commands:\n"
331
1
"    truncate -s %" PRIu64 " %s\n"
332
1
"    gunzip %s\n"
333
1
"The resulting uncompressed file should be %" PRIu64 " bytes in length.\n"
334
1
"If you do not have a truncate command, skip that step (though gunzip will\n"
335
1
"likely produce a \"trailing garbage ignored\" message, which can be ignored).",
336
1
                  csize, filename, filename, usize);
337
1
    return;
338
339
48
no_sizes:
340
48
    hts_log_error(
341
48
"To decompress this file, use the following command:\n"
342
48
"    gunzip %s\n"
343
48
"This will likely produce a \"trailing garbage ignored\" message, which can\n"
344
48
"usually be safely ignored.", filename);
345
48
}
346
347
static const char *bgzf_zerr(int errnum, z_stream *zs)
348
312
{
349
312
    static char buffer[32];
350
351
    /* Return zs->msg if available.
352
       zlib doesn't set this very reliably.  Looking at the source suggests
353
       that it may get set to a useful message for deflateInit2, inflateInit2
354
       and inflate when it returns Z_DATA_ERROR. For inflate with other
355
       return codes, deflate, deflateEnd and inflateEnd it doesn't appear
356
       to be useful.  For the likely non-useful cases, the caller should
357
       pass NULL into zs. */
358
359
312
    if (zs && zs->msg) return zs->msg;
360
361
    // gzerror OF((gzFile file, int *errnum)
362
15
    switch (errnum) {
363
12
    case Z_ERRNO:
364
12
        return strerror(errno);
365
0
    case Z_STREAM_ERROR:
366
0
        return "invalid parameter/compression level, or inconsistent stream state";
367
0
    case Z_DATA_ERROR:
368
0
        return "invalid or incomplete IO";
369
0
    case Z_MEM_ERROR:
370
0
        return "out of memory";
371
3
    case Z_BUF_ERROR:
372
3
        return "progress temporarily not possible, or in() / out() returned an error";
373
0
    case Z_VERSION_ERROR:
374
0
        return "zlib version mismatch";
375
0
    case Z_NEED_DICT:
376
0
        return "data was compressed using a dictionary";
377
0
    case Z_OK: // 0: maybe gzgets error Z_NULL
378
0
    default:
379
0
        snprintf(buffer, sizeof(buffer), "[%d] unknown", errnum);
380
0
        return buffer;  // FIXME: Not thread-safe.
381
15
    }
382
15
}
383
384
static BGZF *bgzf_read_init(hFILE *hfpr, const char *filename)
385
6.03k
{
386
6.03k
    BGZF *fp;
387
6.03k
    uint8_t magic[18];
388
6.03k
    ssize_t n = hpeek(hfpr, magic, 18);
389
6.03k
    if (n < 0) return NULL;
390
391
6.03k
    fp = (BGZF*)calloc(1, sizeof(BGZF));
392
6.03k
    if (fp == NULL) return NULL;
393
394
6.03k
    fp->is_write = 0;
395
6.03k
    fp->uncompressed_block = hts_malloc_p(2, BGZF_MAX_BLOCK_SIZE);
396
6.03k
    if (fp->uncompressed_block == NULL) { free(fp); return NULL; }
397
6.03k
    fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
398
6.03k
    fp->is_compressed = (n==18 && magic[0]==0x1f && magic[1]==0x8b);
399
6.03k
    fp->is_gzip = ( !fp->is_compressed || ((magic[3]&4) && memcmp(&magic[12], "BC\2\0",4)==0) ) ? 0 : 1;
400
6.03k
    if (fp->is_compressed && (magic[3]&4) && memcmp(&magic[12], "RAZF", 4)==0) {
401
49
        hts_log_error("Cannot decompress legacy RAZF format");
402
49
        razf_info(hfpr, filename);
403
49
        free(fp->uncompressed_block);
404
49
        free(fp);
405
49
        errno = EFTYPE;
406
49
        return NULL;
407
49
    }
408
409
5.99k
    if (!(fp->cache = malloc(sizeof(*fp->cache)))) {
410
0
        free(fp->uncompressed_block);
411
0
        free(fp);
412
0
        return NULL;
413
0
    }
414
5.99k
    if (!(fp->cache->h = kh_init(bgzf_cache))) {
415
0
        free(fp->uncompressed_block);
416
0
        free(fp->cache);
417
0
        free(fp);
418
0
        return NULL;
419
0
    }
420
5.99k
    fp->cache->last_pos = 0;
421
5.99k
    fp->cache->private_data = NULL;
422
5.99k
    fp->cache->private_data_cleanup = (bgzf_private_data_cleanup_func *) NULL;
423
5.99k
    return fp;
424
5.99k
}
425
426
// get the compress level from the mode string: compress_level==-1 for the default level, -2 plain uncompressed
427
static int mode2level(const char *mode)
428
12.1k
{
429
12.1k
    int i, compress_level = -1;
430
36.3k
    for (i = 0; mode[i]; ++i)
431
24.2k
        if (mode[i] >= '0' && mode[i] <= '9') break;
432
12.1k
    if (mode[i]) compress_level = (int)mode[i] - '0';
433
12.1k
    if (strchr(mode, 'u')) compress_level = -2;
434
12.1k
    return compress_level;
435
12.1k
}
436
static BGZF *bgzf_write_init(const char *mode)
437
12.1k
{
438
12.1k
    BGZF *fp;
439
12.1k
    fp = (BGZF*)calloc(1, sizeof(BGZF));
440
12.1k
    if (fp == NULL) goto mem_fail;
441
12.1k
    fp->is_write = 1;
442
443
12.1k
    fp->cache = malloc(sizeof(bgzf_cache_t));
444
12.1k
    if (!fp->cache)
445
0
        goto mem_fail;
446
12.1k
    fp->cache->h = NULL;
447
12.1k
    fp->cache->last_pos = 0;
448
12.1k
    fp->cache->private_data = NULL;
449
12.1k
    fp->cache->private_data_cleanup = (bgzf_private_data_cleanup_func *) NULL;
450
451
12.1k
    int compress_level = mode2level(mode);
452
12.1k
    if ( compress_level==-2 )
453
0
    {
454
0
        fp->is_compressed = 0;
455
0
        return fp;
456
0
    }
457
12.1k
    fp->is_compressed = 1;
458
459
12.1k
    fp->uncompressed_block = hts_malloc_p(2, BGZF_MAX_BLOCK_SIZE);
460
12.1k
    if (fp->uncompressed_block == NULL) goto mem_fail;
461
12.1k
    fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
462
463
12.1k
    fp->compress_level = compress_level < 0? Z_DEFAULT_COMPRESSION : compress_level; // Z_DEFAULT_COMPRESSION==-1
464
12.1k
    if (fp->compress_level > 9) fp->compress_level = Z_DEFAULT_COMPRESSION;
465
12.1k
    if ( strchr(mode,'g') )
466
0
    {
467
        // gzip output
468
0
        fp->is_gzip = 1;
469
0
        fp->gz_stream = (z_stream*)calloc(1,sizeof(z_stream));
470
0
        if (fp->gz_stream == NULL) goto mem_fail;
471
0
        fp->gz_stream->zalloc = NULL;
472
0
        fp->gz_stream->zfree  = NULL;
473
0
        fp->gz_stream->msg    = NULL;
474
475
0
        int ret = deflateInit2(fp->gz_stream, fp->compress_level, Z_DEFLATED, 15|16, 8, Z_DEFAULT_STRATEGY);
476
0
        if (ret!=Z_OK) {
477
0
            hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret, fp->gz_stream));
478
0
            goto fail;
479
0
        }
480
0
    }
481
12.1k
    return fp;
482
483
0
mem_fail:
484
0
    hts_log_error("%s", strerror(errno));
485
486
0
fail:
487
0
    if (fp != NULL) {
488
0
        free(fp->cache);
489
0
        free(fp->uncompressed_block);
490
0
        free(fp->gz_stream);
491
0
        free(fp);
492
0
    }
493
0
    return NULL;
494
0
}
495
496
BGZF *bgzf_open(const char *path, const char *mode)
497
2
{
498
2
    BGZF *fp = 0;
499
2
    if (strchr(mode, 'r')) {
500
2
        hFILE *fpr;
501
2
        if ((fpr = hopen(path, mode)) == 0) return 0;
502
0
        fp = bgzf_read_init(fpr, path);
503
0
        if (fp == 0) { hclose_abruptly(fpr); return NULL; }
504
0
        fp->fp = fpr;
505
0
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
506
0
        hFILE *fpw;
507
0
        if ((fpw = hopen(path, mode)) == 0) return 0;
508
0
        fp = bgzf_write_init(mode);
509
0
        if (fp == NULL) return NULL;
510
0
        fp->fp = fpw;
511
0
    }
512
0
    else { errno = EINVAL; return 0; }
513
514
0
    fp->is_be = ed_is_big();
515
0
    return fp;
516
2
}
517
518
BGZF *bgzf_dopen(int fd, const char *mode)
519
0
{
520
0
    BGZF *fp = 0;
521
0
    if (strchr(mode, 'r')) {
522
0
        hFILE *fpr;
523
0
        if ((fpr = hdopen(fd, mode)) == 0) return 0;
524
0
        fp = bgzf_read_init(fpr, NULL);
525
0
        if (fp == 0) { hclose_abruptly(fpr); return NULL; } // FIXME this closes fd
526
0
        fp->fp = fpr;
527
0
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
528
0
        hFILE *fpw;
529
0
        if ((fpw = hdopen(fd, mode)) == 0) return 0;
530
0
        fp = bgzf_write_init(mode);
531
0
        if (fp == NULL) return NULL;
532
0
        fp->fp = fpw;
533
0
    }
534
0
    else { errno = EINVAL; return 0; }
535
536
0
    fp->is_be = ed_is_big();
537
0
    return fp;
538
0
}
539
540
BGZF *bgzf_hopen(hFILE *hfp, const char *mode)
541
18.1k
{
542
18.1k
    BGZF *fp = NULL;
543
18.1k
    if (strchr(mode, 'r')) {
544
6.03k
        fp = bgzf_read_init(hfp, NULL);
545
6.03k
        if (fp == NULL) return NULL;
546
12.1k
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
547
12.1k
        fp = bgzf_write_init(mode);
548
12.1k
        if (fp == NULL) return NULL;
549
12.1k
    }
550
0
    else { errno = EINVAL; return 0; }
551
552
18.0k
    fp->fp = hfp;
553
18.0k
    fp->is_be = ed_is_big();
554
18.0k
    return fp;
555
18.1k
}
556
557
#ifdef HAVE_LIBDEFLATE
558
uint32_t hts_crc32(uint32_t crc, const void *buf, size_t len) {
559
    return libdeflate_crc32(crc, buf, len);
560
}
561
562
int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
563
{
564
    if (slen == 0) {
565
        // EOF block
566
        if (*dlen < 28) return -1;
567
        memcpy(_dst, "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", 28);
568
        *dlen = 28;
569
        return 0;
570
    }
571
572
    uint8_t *dst = (uint8_t*)_dst;
573
574
    if (level == 0) {
575
        // Uncompressed data
576
        if (*dlen < slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH) return -1;
577
        dst[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
578
        u16_to_le(slen,  &dst[BLOCK_HEADER_LENGTH+1]); // length
579
        u16_to_le(~slen, &dst[BLOCK_HEADER_LENGTH+3]); // ones-complement length
580
        memcpy(dst + BLOCK_HEADER_LENGTH+5, src, slen);
581
        *dlen = slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
582
583
    } else {
584
        level = level > 0 ? level : 6; // libdeflate doesn't honour -1 as default
585
        // NB levels go up to 12 here.
586
        int lvl_map[] = {0,1,2,3,5,6,7,8,10,12};
587
        level = lvl_map[level>9 ?9 :level];
588
        struct libdeflate_compressor *z = libdeflate_alloc_compressor(level);
589
        if (!z) return -1;
590
591
        // Raw deflate
592
        size_t clen =
593
            libdeflate_deflate_compress(z, src, slen,
594
                                        dst + BLOCK_HEADER_LENGTH,
595
                                        *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH);
596
597
        if (clen <= 0) {
598
            hts_log_error("Call to libdeflate_deflate_compress failed");
599
            libdeflate_free_compressor(z);
600
            return -1;
601
        }
602
603
        *dlen = clen + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
604
605
        libdeflate_free_compressor(z);
606
    }
607
608
    // write the header
609
    memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
610
    packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
611
612
    // write the footer
613
    uint32_t crc = libdeflate_crc32(0, src, slen);
614
    packInt32((uint8_t*)&dst[*dlen - 8], crc);
615
    packInt32((uint8_t*)&dst[*dlen - 4], slen);
616
    return 0;
617
}
618
619
#else
620
621
0
uint32_t hts_crc32(uint32_t crc, const void *buf, size_t len) {
622
0
    return crc32(crc, buf, len);
623
0
}
624
625
int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
626
33.7k
{
627
33.7k
    uint32_t crc;
628
33.7k
    z_stream zs;
629
33.7k
    uint8_t *dst = (uint8_t*)_dst;
630
631
33.7k
    if (level == 0) {
632
0
    uncomp:
633
        // Uncompressed data
634
0
        if (*dlen < slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH) return -1;
635
0
        dst[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
636
0
        u16_to_le(slen,  &dst[BLOCK_HEADER_LENGTH+1]); // length
637
0
        u16_to_le(~slen, &dst[BLOCK_HEADER_LENGTH+3]); // ones-complement length
638
0
        memcpy(dst + BLOCK_HEADER_LENGTH+5, src, slen);
639
0
        *dlen = slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
640
33.7k
    } else {
641
        // compress the body
642
33.7k
        zs.zalloc = NULL; zs.zfree = NULL;
643
33.7k
        zs.msg = NULL;
644
33.7k
        zs.next_in  = (Bytef*)src;
645
33.7k
        zs.avail_in = slen;
646
33.7k
        zs.next_out = dst + BLOCK_HEADER_LENGTH;
647
33.7k
        zs.avail_out = *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
648
33.7k
        int ret = deflateInit2(&zs, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); // -15 to disable zlib header/footer
649
33.7k
        if (ret!=Z_OK) {
650
0
            hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret, &zs));
651
0
            return -1;
652
0
        }
653
33.7k
        if ((ret = deflate(&zs, Z_FINISH)) != Z_STREAM_END) {
654
0
            if (ret == Z_OK && zs.avail_out == 0) {
655
0
                deflateEnd(&zs);
656
0
                goto uncomp;
657
0
            } else {
658
0
                hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
659
0
            }
660
0
            return -1;
661
0
        }
662
        // If we used up the entire output buffer, then we either ran out of
663
        // room or we *just* fitted, but either way we may as well store
664
        // uncompressed for faster decode.
665
33.7k
        if (zs.avail_out == 0) {
666
0
            deflateEnd(&zs);
667
0
            goto uncomp;
668
0
        }
669
33.7k
        if ((ret = deflateEnd(&zs)) != Z_OK) {
670
0
            hts_log_error("Call to deflateEnd failed: %s", bgzf_zerr(ret, NULL));
671
0
            return -1;
672
0
        }
673
33.7k
        *dlen = zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
674
33.7k
    }
675
676
    // write the header
677
33.7k
    memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
678
33.7k
    packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
679
    // write the footer
680
33.7k
    crc = crc32(crc32(0L, NULL, 0L), (Bytef*)src, slen);
681
33.7k
    packInt32((uint8_t*)&dst[*dlen - 8], crc);
682
33.7k
    packInt32((uint8_t*)&dst[*dlen - 4], slen);
683
33.7k
    return 0;
684
33.7k
}
685
#endif // HAVE_LIBDEFLATE
686
687
static int bgzf_gzip_compress(BGZF *fp, void *_dst, size_t *dlen, const void *src, size_t slen, int level)
688
0
{
689
0
    uint8_t *dst = (uint8_t*)_dst;
690
0
    z_stream *zs = fp->gz_stream;
691
0
    int flush = slen ? Z_PARTIAL_FLUSH : Z_FINISH;
692
0
    zs->next_in   = (Bytef*)src;
693
0
    zs->avail_in  = slen;
694
0
    zs->next_out  = dst;
695
0
    zs->avail_out = *dlen;
696
0
    int ret = deflate(zs, flush);
697
0
    if (ret == Z_STREAM_ERROR) {
698
0
        hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret, NULL));
699
0
        return -1;
700
0
    }
701
0
    if (zs->avail_in != 0) {
702
0
        hts_log_error("Deflate block too large for output buffer");
703
0
        return -1;
704
0
    }
705
0
    *dlen = *dlen - zs->avail_out;
706
0
    return 0;
707
0
}
708
709
// Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
710
static int deflate_block(BGZF *fp, int block_length)
711
33.7k
{
712
33.7k
    size_t comp_size = BGZF_MAX_BLOCK_SIZE;
713
33.7k
    int ret;
714
33.7k
    if ( !fp->is_gzip )
715
33.7k
        ret = bgzf_compress(fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
716
0
    else
717
0
        ret = bgzf_gzip_compress(fp, fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
718
719
33.7k
    if ( ret != 0 )
720
0
    {
721
0
        hts_log_debug("Compression error %d", ret);
722
0
        fp->errcode |= BGZF_ERR_ZLIB;
723
0
        return -1;
724
0
    }
725
33.7k
    fp->block_offset = 0;
726
33.7k
    return comp_size;
727
33.7k
}
728
729
#ifdef HAVE_LIBDEFLATE
730
731
static int bgzf_uncompress(uint8_t *dst, size_t *dlen,
732
                           const uint8_t *src, size_t slen,
733
                           uint32_t expected_crc) {
734
    struct libdeflate_decompressor *z = libdeflate_alloc_decompressor();
735
    if (!z) {
736
        hts_log_error("Call to libdeflate_alloc_decompressor failed");
737
        return -1;
738
    }
739
740
    int ret = libdeflate_deflate_decompress(z, src, slen, dst, *dlen, dlen);
741
    libdeflate_free_decompressor(z);
742
743
    if (ret != LIBDEFLATE_SUCCESS) {
744
        hts_log_error("Inflate operation failed: %d", ret);
745
        return -1;
746
    }
747
748
    uint32_t crc = libdeflate_crc32(0, (unsigned char *)dst, *dlen);
749
#ifdef FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION
750
    // Pretend the CRC was OK so the fuzzer doesn't have to get it right
751
    crc = expected_crc;
752
#endif
753
    if (crc != expected_crc) {
754
        hts_log_error("CRC32 checksum mismatch");
755
        return -2;
756
    }
757
758
    return 0;
759
}
760
761
#else
762
763
static int bgzf_uncompress(uint8_t *dst, size_t *dlen,
764
                           const uint8_t *src, size_t slen,
765
137
                           uint32_t expected_crc) {
766
137
    z_stream zs = {
767
137
        .zalloc = NULL,
768
137
        .zfree = NULL,
769
137
        .msg = NULL,
770
137
        .next_in = (Bytef*)src,
771
137
        .avail_in = slen,
772
137
        .next_out = (Bytef*)dst,
773
137
        .avail_out = *dlen
774
137
    };
775
776
137
    int ret = inflateInit2(&zs, -15);
777
137
    if (ret != Z_OK) {
778
0
        hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret, &zs));
779
0
        return -1;
780
0
    }
781
137
    if ((ret = inflate(&zs, Z_FINISH)) != Z_STREAM_END) {
782
12
        hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
783
12
        if ((ret = inflateEnd(&zs)) != Z_OK) {
784
0
            hts_log_warning("Call to inflateEnd failed: %s", bgzf_zerr(ret, NULL));
785
0
        }
786
12
        return -1;
787
12
    }
788
125
    if ((ret = inflateEnd(&zs)) != Z_OK) {
789
0
        hts_log_error("Call to inflateEnd failed: %s", bgzf_zerr(ret, NULL));
790
0
        return -1;
791
0
    }
792
125
    *dlen = *dlen - zs.avail_out;
793
794
125
    uint32_t crc = crc32(crc32(0L, NULL, 0L), (unsigned char *)dst, *dlen);
795
125
#ifdef FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION
796
    // Pretend the CRC was OK so the fuzzer doesn't have to get it right
797
125
    crc = expected_crc;
798
125
#endif
799
125
    if (crc != expected_crc) {
800
0
        hts_log_error("CRC32 checksum mismatch");
801
0
        return -2;
802
0
    }
803
804
125
    return 0;
805
125
}
806
#endif // HAVE_LIBDEFLATE
807
808
// Inflate the block in fp->compressed_block into fp->uncompressed_block
809
static int inflate_block(BGZF* fp, int block_length)
810
137
{
811
137
    size_t dlen = BGZF_MAX_BLOCK_SIZE;
812
137
    uint32_t crc = le_to_u32((uint8_t *)fp->compressed_block + block_length-8);
813
137
    int ret = bgzf_uncompress(fp->uncompressed_block, &dlen,
814
137
                              (Bytef*)fp->compressed_block + 18,
815
137
                              block_length - 18, crc);
816
137
    if (ret < 0) {
817
12
        if (ret == -2)
818
0
            fp->errcode |= BGZF_ERR_CRC;
819
12
        else
820
12
            fp->errcode |= BGZF_ERR_ZLIB;
821
12
        return -1;
822
12
    }
823
824
125
    return dlen;
825
137
}
826
827
// Decompress the next part of a non-blocked GZIP file.
828
// Return the number of uncompressed bytes read, 0 on EOF, or a negative number on error.
829
// Will fill the output buffer unless the end of the GZIP file is reached.
830
static int inflate_gzip_block(BGZF *fp)
831
45.5k
{
832
    // we will set this to true when we detect EOF, so we don't bang against the EOF more than once per call
833
45.5k
    int input_eof = 0;
834
835
    // write to the part of the output buffer after block_offset
836
45.5k
    fp->gz_stream->next_out = (Bytef*)fp->uncompressed_block + fp->block_offset;
837
45.5k
    fp->gz_stream->avail_out = BGZF_MAX_BLOCK_SIZE - fp->block_offset;
838
839
91.6k
    while ( fp->gz_stream->avail_out != 0 ) {
840
        // until we fill the output buffer (or hit EOF)
841
842
46.9k
        if ( !input_eof && fp->gz_stream->avail_in == 0 ) {
843
            // we are out of input data in the buffer. Get more.
844
1.27k
            fp->gz_stream->next_in = fp->compressed_block;
845
1.27k
            int ret = hread(fp->fp, fp->compressed_block, BGZF_BLOCK_SIZE);
846
1.27k
            if ( ret < 0 ) {
847
                // hread had an error. Pass it on.
848
0
                return ret;
849
0
            }
850
1.27k
            fp->gz_stream->avail_in = ret;
851
1.27k
            if ( fp->gz_stream->avail_in < BGZF_BLOCK_SIZE ) {
852
                // we have reached EOF but the decompressor hasn't necessarily
853
955
                input_eof = 1;
854
955
            }
855
1.27k
        }
856
857
46.9k
        fp->gz_stream->msg = NULL;
858
        // decompress as much data as we can
859
46.9k
        int ret = inflate(fp->gz_stream, Z_SYNC_FLUSH);
860
861
46.9k
        if ( (ret < 0 && ret != Z_BUF_ERROR) || ret == Z_NEED_DICT ) {
862
            // an error occurred, other than running out of space
863
288
            hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? fp->gz_stream : NULL));
864
288
            fp->errcode |= BGZF_ERR_ZLIB;
865
288
            return -1;
866
46.6k
        } else if ( ret == Z_STREAM_END ) {
867
            // we finished a GZIP member
868
869
            // scratch for peeking to see if the file is over
870
60
            char c;
871
60
            if (fp->gz_stream->avail_in > 0 || hpeek(fp->fp, &c, 1) == 1) {
872
                // there is more data; try and read another GZIP member in the remaining data
873
60
                int reset_ret = inflateReset(fp->gz_stream);
874
60
                if (reset_ret != Z_OK) {
875
0
                    hts_log_error("Call to inflateReset failed: %s", bgzf_zerr(reset_ret, NULL));
876
0
                    fp->errcode |= BGZF_ERR_ZLIB;
877
0
                    return -1;
878
0
                }
879
60
            } else {
880
                // we consumed all the input data and hit Z_STREAM_END
881
                // so stop looping, even if we never fill the output buffer
882
0
                break;
883
0
            }
884
46.5k
        } else if ( ret == Z_BUF_ERROR && input_eof && fp->gz_stream->avail_out > 0 ) {
885
            // the gzip file has ended prematurely
886
599
            hts_log_error("Gzip file truncated");
887
599
            fp->errcode |= BGZF_ERR_IO;
888
599
            return -1;
889
599
        }
890
46.9k
    }
891
892
    // when we get here, the buffer is full or there is an EOF after a complete gzip member
893
44.6k
    return BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
894
45.5k
}
895
896
// Returns: 0 on success (BGZF header); -1 on non-BGZF GZIP header; -2 on error
897
static int check_header(const uint8_t *header)
898
1.59k
{
899
1.59k
    if ( header[0] != 31 || header[1] != 139 || header[2] != 8 ) return -2;
900
1.57k
    return ((header[3] & 4) != 0
901
1.00k
            && unpackInt16((uint8_t*)&header[10]) == 6
902
983
            && header[12] == 'B' && header[13] == 'C'
903
1.41k
            && unpackInt16((uint8_t*)&header[14]) == 2) ? 0 : -1;
904
1.59k
}
905
906
static void free_cache(BGZF *fp)
907
18.0k
{
908
18.0k
    khint_t k;
909
18.0k
    if (fp->cache->h) {
910
5.99k
        khash_t(bgzf_cache) *h = fp->cache->h;
911
5.99k
        for (k = kh_begin(h); k < kh_end(h); ++k)
912
0
            if (kh_exist(h, k)) free(kh_val(h, k).block);
913
5.99k
        kh_destroy(bgzf_cache, h);
914
5.99k
    }
915
18.0k
    bgzf_clear_private_data(fp);
916
18.0k
    free(fp->cache);
917
18.0k
}
918
919
static int load_block_from_cache(BGZF *fp, int64_t block_address)
920
0
{
921
0
    khint_t k;
922
0
    cache_t *p;
923
924
0
    khash_t(bgzf_cache) *h = fp->cache->h;
925
0
    k = kh_get(bgzf_cache, h, block_address);
926
0
    if (k == kh_end(h)) return 0;
927
0
    p = &kh_val(h, k);
928
0
    if (fp->block_length != 0) fp->block_offset = 0;
929
0
    fp->block_address = block_address;
930
0
    fp->block_length = p->size;
931
0
    memcpy(fp->uncompressed_block, p->block, p->size);
932
0
    if ( hseek(fp->fp, p->end_offset, SEEK_SET) < 0 )
933
0
    {
934
        // todo: move the error up
935
0
        hts_log_error("Could not hseek to %" PRId64, p->end_offset);
936
0
        exit(1);
937
0
    }
938
0
    return p->size;
939
0
}
940
941
static void cache_block(BGZF *fp, int size)
942
111
{
943
111
    int ret;
944
111
    khint_t k, k_orig;
945
111
    uint8_t *block = NULL;
946
111
    cache_t *p;
947
    //fprintf(stderr, "Cache block at %llx\n", (int)fp->block_address);
948
111
    khash_t(bgzf_cache) *h = fp->cache->h;
949
111
    if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return;
950
0
    if (fp->block_length < 0 || fp->block_length > BGZF_MAX_BLOCK_SIZE) return;
951
0
    if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > (uint32_t)fp->cache_size) {
952
        /* Remove uniformly from any position in the hash by a simple
953
         * round-robin approach.  An alternative strategy would be to
954
         * remove the least recently accessed block, but the round-robin
955
         * removal is simpler and is not expected to have a big impact
956
         * on performance */
957
0
        if (fp->cache->last_pos >= kh_end(h)) fp->cache->last_pos = kh_begin(h);
958
0
        k_orig = k = fp->cache->last_pos;
959
0
        if (++k >= kh_end(h)) k = kh_begin(h);
960
0
        while (k != k_orig) {
961
0
            if (kh_exist(h, k))
962
0
                break;
963
0
            if (++k == kh_end(h))
964
0
                k = kh_begin(h);
965
0
        }
966
0
        fp->cache->last_pos = k;
967
968
0
        if (k != k_orig) {
969
0
            block = kh_val(h, k).block;
970
0
            kh_del(bgzf_cache, h, k);
971
0
        }
972
0
    } else {
973
0
        block = (uint8_t*)malloc(BGZF_MAX_BLOCK_SIZE);
974
0
    }
975
0
    if (!block) return;
976
0
    k = kh_put(bgzf_cache, h, fp->block_address, &ret);
977
0
    if (ret <= 0) { // kh_put failed, or in there already (shouldn't happen)
978
0
        free(block);
979
0
        return;
980
0
    }
981
0
    p = &kh_val(h, k);
982
0
    p->size = fp->block_length;
983
0
    p->end_offset = fp->block_address + size;
984
0
    p->block = block;
985
0
    memcpy(p->block, fp->uncompressed_block, p->size);
986
0
}
987
988
/*
989
 * Absolute htell in this compressed file.
990
 *
991
 * Do not confuse with the external bgzf_tell macro which returns the virtual
992
 * offset.
993
 */
994
95.7k
static off_t bgzf_htell(BGZF *fp) {
995
95.7k
    if (fp->mt) {
996
0
        pthread_mutex_lock(&fp->mt->job_pool_m);
997
0
        off_t pos = fp->block_address + fp->block_clength;
998
0
        pthread_mutex_unlock(&fp->mt->job_pool_m);
999
0
        return pos;
1000
95.7k
    } else {
1001
95.7k
        return htell(fp->fp);
1002
95.7k
    }
1003
95.7k
}
1004
1005
int bgzf_read_block(BGZF *fp)
1006
49.8k
{
1007
49.8k
    hts_tpool_result *r;
1008
1009
49.8k
    if (fp->errcode) return -1;
1010
1011
49.8k
    if (fp->mt) {
1012
0
    again:
1013
0
        if (fp->mt->hit_eof) {
1014
            // Further reading at EOF will always return 0
1015
0
            fp->block_length = 0;
1016
0
            return 0;
1017
0
        }
1018
0
        r = hts_tpool_next_result_wait(fp->mt->out_queue);
1019
0
        bgzf_job *j = r ? (bgzf_job *)hts_tpool_result_data(r) : NULL;
1020
1021
0
        if (!j || j->errcode == BGZF_ERR_MT) {
1022
0
            if (!fp->mt->free_block) {
1023
0
                fp->uncompressed_block = hts_malloc_p(2, BGZF_MAX_BLOCK_SIZE);
1024
0
                if (fp->uncompressed_block == NULL) return -1;
1025
0
                fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
1026
0
            } // else it's already allocated with malloc, maybe even in-use.
1027
0
            if (mt_destroy(fp->mt) < 0) {
1028
0
                fp->errcode = BGZF_ERR_IO;
1029
0
            }
1030
0
            fp->mt = NULL;
1031
0
            hts_tpool_delete_result(r, 0);
1032
0
            if (fp->errcode) {
1033
0
                return -1;
1034
0
            }
1035
0
            goto single_threaded;
1036
0
        }
1037
1038
0
        if (j->errcode) {
1039
0
            fp->errcode = j->errcode;
1040
0
            hts_log_error("BGZF decode jobs returned error %d "
1041
0
                          "for block offset %"PRId64,
1042
0
                          j->errcode, j->block_address);
1043
0
            hts_tpool_delete_result(r, 0);
1044
0
            return -1;
1045
0
        }
1046
1047
0
        if (j->hit_eof) {
1048
0
            if (!fp->last_block_eof && !fp->no_eof_block) {
1049
0
                fp->no_eof_block = 1;
1050
0
                hts_log_warning("EOF marker is absent. The input may be truncated");
1051
0
            }
1052
0
            fp->mt->hit_eof = 1;
1053
0
        }
1054
1055
        // Zero length blocks in the middle of a file are (wrongly)
1056
        // considered as EOF by many callers.  We work around this by
1057
        // trying again to see if we hit a genuine EOF.
1058
0
        if (!j->hit_eof && j->uncomp_len == 0) {
1059
0
            fp->last_block_eof = 1;
1060
0
            hts_tpool_delete_result(r, 0);
1061
0
            goto again;
1062
0
        }
1063
1064
        // block_length=0 and block_offset set by bgzf_seek.
1065
0
        if (fp->block_length != 0) fp->block_offset = 0;
1066
0
        if (!j->hit_eof) fp->block_address = j->block_address;
1067
0
        fp->block_clength = j->comp_len;
1068
0
        fp->block_length = j->uncomp_len;
1069
        // bgzf_read() can change fp->block_length
1070
0
        fp->last_block_eof = (fp->block_length == 0);
1071
1072
0
        if ( j->uncomp_len && j->fp->idx_build_otf )
1073
0
        {
1074
0
            bgzf_index_add_block(j->fp);
1075
0
            j->fp->idx->ublock_addr += j->uncomp_len;
1076
0
        }
1077
1078
        // Steal the data block as it's quicker than a memcpy.
1079
        // We just need to make sure we delay the pool free.
1080
0
        if (fp->mt->curr_job) {
1081
0
            pthread_mutex_lock(&fp->mt->job_pool_m);
1082
0
            pool_free(fp->mt->job_pool, fp->mt->curr_job);
1083
0
            pthread_mutex_unlock(&fp->mt->job_pool_m);
1084
0
        }
1085
0
        fp->uncompressed_block = j->uncomp_data;
1086
0
        fp->mt->curr_job = j;
1087
0
        if (fp->mt->free_block) {
1088
0
            free(fp->mt->free_block); // clear up last non-mt block
1089
0
            fp->mt->free_block = NULL;
1090
0
        }
1091
1092
0
        hts_tpool_delete_result(r, 0);
1093
0
        return 0;
1094
0
    }
1095
1096
49.8k
    uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
1097
49.8k
    int count, size, block_length, remaining;
1098
1099
49.8k
 single_threaded:
1100
49.8k
    size = 0;
1101
1102
49.8k
    int64_t block_address;
1103
49.8k
    block_address = bgzf_htell(fp);
1104
1105
    // Reading an uncompressed file
1106
49.8k
    if ( !fp->is_compressed )
1107
4.03k
    {
1108
4.03k
        count = hread(fp->fp, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
1109
4.03k
        if (count < 0)  // Error
1110
0
        {
1111
0
            hts_log_error("Failed to read uncompressed data "
1112
0
                          "at offset %"PRId64"%s%s",
1113
0
                          block_address, errno ? ": " : "", strerror(errno));
1114
0
            fp->errcode |= BGZF_ERR_IO;
1115
0
            return -1;
1116
0
        }
1117
4.03k
        else if (count == 0)  // EOF
1118
1.10k
        {
1119
1.10k
            fp->block_length = 0;
1120
1.10k
            return 0;
1121
1.10k
        }
1122
2.93k
        if (fp->block_length != 0) fp->block_offset = 0;
1123
2.93k
        fp->block_address = block_address;
1124
2.93k
        fp->block_length = count;
1125
2.93k
        return 0;
1126
4.03k
    }
1127
1128
    // Reading compressed file
1129
45.8k
    if ( fp->is_gzip && fp->gz_stream ) // is this is an initialized gzip stream?
1130
44.1k
    {
1131
44.1k
        count = inflate_gzip_block(fp);
1132
44.1k
        if ( count<0 )
1133
800
        {
1134
800
            hts_log_error("Reading GZIP stream failed at offset %"PRId64,
1135
800
                          block_address);
1136
800
            fp->errcode |= BGZF_ERR_ZLIB;
1137
800
            return -1;
1138
800
        }
1139
43.3k
        fp->block_length = count;
1140
43.3k
        fp->block_address = block_address;
1141
43.3k
        return 0;
1142
44.1k
    }
1143
1.70k
    if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
1144
1145
    // loop to skip empty bgzf blocks
1146
1.71k
    while (1)
1147
1.71k
    {
1148
1.71k
        count = hread(fp->fp, header, sizeof(header));
1149
1.71k
        if (count == 0) { // no data read
1150
108
            if (!fp->last_block_eof && !fp->no_eof_block && !fp->is_gzip) {
1151
42
                fp->no_eof_block = 1;
1152
42
                hts_log_warning("EOF marker is absent. The input may be truncated");
1153
42
            }
1154
108
            fp->block_length = 0;
1155
108
            return 0;
1156
108
        }
1157
1.60k
        int ret = 0;
1158
1.60k
        if ( count != sizeof(header) || (ret=check_header(header))==-2 )
1159
33
        {
1160
33
            fp->errcode |= BGZF_ERR_HEADER;
1161
33
            hts_log_error("%s BGZF header at offset %"PRId64,
1162
33
                          ret ? "Invalid" : "Failed to read",
1163
33
                          block_address);
1164
33
            return -1;
1165
33
        }
1166
1.57k
        if ( ret==-1 )
1167
1.41k
        {
1168
            // GZIP, not BGZF
1169
1.41k
            uint8_t *cblock = (uint8_t*)fp->compressed_block;
1170
1.41k
            memcpy(cblock, header, sizeof(header));
1171
1.41k
            count = hread(fp->fp, cblock+sizeof(header), BGZF_BLOCK_SIZE - sizeof(header)) + sizeof(header);
1172
1173
1.41k
            fp->is_gzip = 1;
1174
1.41k
            fp->gz_stream = (z_stream*) calloc(1,sizeof(z_stream));
1175
            // Set up zlib, using a window size of 15, and its built-in GZIP header processing (+16).
1176
1.41k
            int ret = inflateInit2(fp->gz_stream, 15 + 16);
1177
1.41k
            if (ret != Z_OK)
1178
0
            {
1179
0
                hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret, fp->gz_stream));
1180
0
                fp->errcode |= BGZF_ERR_ZLIB;
1181
0
                return -1;
1182
0
            }
1183
1.41k
            fp->gz_stream->avail_in = count;
1184
1.41k
            fp->gz_stream->next_in  = cblock;
1185
1.41k
            count = inflate_gzip_block(fp);
1186
1.41k
            if ( count<0 )
1187
87
            {
1188
87
                hts_log_error("Reading GZIP stream failed at offset %"PRId64,
1189
87
                              block_address);
1190
87
                fp->errcode |= BGZF_ERR_ZLIB;
1191
87
                return -1;
1192
87
            }
1193
1.33k
            fp->block_length = count;
1194
1.33k
            fp->block_address = block_address;
1195
1.33k
            if ( fp->idx_build_otf ) return -1; // cannot build index for gzip
1196
1.33k
            return 0;
1197
1.33k
        }
1198
157
        size = count;
1199
157
        block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1200
157
        if (block_length < BLOCK_HEADER_LENGTH)
1201
5
        {
1202
5
            hts_log_error("Invalid BGZF block length at offset %"PRId64,
1203
5
                          block_address);
1204
5
            fp->errcode |= BGZF_ERR_HEADER;
1205
5
            return -1;
1206
5
        }
1207
152
        compressed_block = (uint8_t*)fp->compressed_block;
1208
152
        memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1209
152
        remaining = block_length - BLOCK_HEADER_LENGTH;
1210
152
        count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1211
152
        if (count != remaining) {
1212
15
            hts_log_error("Failed to read BGZF block data at offset %"PRId64
1213
15
                          " expected %d bytes; hread returned %d",
1214
15
                          block_address, remaining, count);
1215
15
            fp->errcode |= BGZF_ERR_IO;
1216
15
            return -1;
1217
15
        }
1218
137
        size += count;
1219
137
        if ((count = inflate_block(fp, block_length)) < 0) {
1220
12
            hts_log_debug("Inflate block operation failed for "
1221
12
                          "block at offset %"PRId64": %s",
1222
12
                          block_address, bgzf_zerr(count, NULL));
1223
12
            fp->errcode |= BGZF_ERR_ZLIB;
1224
12
            return -1;
1225
12
        }
1226
125
        fp->last_block_eof = (count == 0);
1227
125
        if ( count ) break;     // otherwise an empty bgzf block
1228
14
        block_address = bgzf_htell(fp); // update for new block start
1229
14
    }
1230
111
    if (fp->block_length != 0) fp->block_offset = 0; // Do not reset offset if this read follows a seek.
1231
111
    fp->block_address = block_address;
1232
111
    fp->block_length = count;
1233
111
    if ( fp->idx_build_otf )
1234
0
    {
1235
0
        bgzf_index_add_block(fp);
1236
0
        fp->idx->ublock_addr += count;
1237
0
    }
1238
111
    cache_block(fp, size);
1239
111
    return 0;
1240
1.70k
}
1241
1242
ssize_t bgzf_read(BGZF *fp, void *data, size_t length)
1243
17.0k
{
1244
17.0k
    ssize_t bytes_read = 0;
1245
17.0k
    uint8_t *output = (uint8_t*)data;
1246
17.0k
    if (length <= 0) return 0;
1247
17.0k
    assert(fp->is_write == 0);
1248
30.4k
    while (bytes_read < length) {
1249
16.2k
        int copy_length, available = fp->block_length - fp->block_offset;
1250
16.2k
        uint8_t *buffer;
1251
16.2k
        if (available <= 0) {
1252
4.21k
            int ret = bgzf_read_block(fp);
1253
4.21k
            if (ret != 0) {
1254
29
                hts_log_error("Read block operation failed with error %d after %zd of %zu bytes", fp->errcode, bytes_read, length);
1255
29
                fp->errcode |= BGZF_ERR_ZLIB;
1256
29
                return -1;
1257
29
            }
1258
4.18k
            available = fp->block_length - fp->block_offset;
1259
4.18k
            if (available == 0) {
1260
1.09k
                if (fp->block_length == 0)
1261
1.09k
                    break; // EOF
1262
1263
                // Offset was at end of block (see commit e9863a0)
1264
0
                fp->block_address = bgzf_htell(fp);
1265
0
                fp->block_offset = fp->block_length = 0;
1266
0
                continue;
1267
3.09k
            } else if (available < 0) {
1268
                // Block offset was set to an invalid coordinate
1269
0
                hts_log_error("BGZF block offset %d set beyond block size %d",
1270
0
                              fp->block_offset, fp->block_length);
1271
0
                fp->errcode |= BGZF_ERR_MISUSE;
1272
0
                return -1;
1273
0
            }
1274
4.18k
        }
1275
15.1k
        copy_length = length - bytes_read < available? length - bytes_read : available;
1276
15.1k
        buffer = (uint8_t*)fp->uncompressed_block;
1277
15.1k
        memcpy(output, buffer + fp->block_offset, copy_length);
1278
15.1k
        fp->block_offset += copy_length;
1279
15.1k
        output += copy_length;
1280
15.1k
        bytes_read += copy_length;
1281
1282
        // For raw gzip streams this avoids short reads.
1283
15.1k
        if (fp->block_offset == fp->block_length) {
1284
1.77k
            fp->block_address = bgzf_htell(fp);
1285
1.77k
            fp->block_offset = fp->block_length = 0;
1286
1.77k
        }
1287
15.1k
    }
1288
1289
15.2k
    fp->uncompressed_address += bytes_read;
1290
1291
15.2k
    return bytes_read;
1292
15.2k
}
1293
1294
// -1 for EOF, -2 for error, 0-255 for byte.
1295
2.21M
int bgzf_peek(BGZF *fp) {
1296
2.21M
    int available = fp->block_length - fp->block_offset;
1297
2.21M
    if (available <= 0) {
1298
54
        if (bgzf_read_block(fp) < 0) {
1299
3
            hts_log_error("Read block operation failed with error %d", fp->errcode);
1300
3
            fp->errcode = BGZF_ERR_ZLIB;
1301
3
            return -2;
1302
3
        }
1303
54
    }
1304
2.21M
    available = fp->block_length - fp->block_offset;
1305
2.21M
    if (available)
1306
2.21M
        return ((unsigned char *)fp->uncompressed_block)[fp->block_offset];
1307
1308
0
    return -1;
1309
2.21M
}
1310
1311
ssize_t bgzf_raw_read(BGZF *fp, void *data, size_t length)
1312
0
{
1313
0
    ssize_t ret = hread(fp->fp, data, length);
1314
0
    if (ret < 0) fp->errcode |= BGZF_ERR_IO;
1315
0
    return ret;
1316
0
}
1317
1318
#ifdef BGZF_MT
1319
1320
/* Function to clean up when jobs are discarded (e.g. during seek)
1321
 * This works for results too, as results are the same struct with
1322
 * decompressed data stored in it. */
1323
0
static void job_cleanup(void *arg) {
1324
0
    bgzf_job *j = (bgzf_job *)arg;
1325
0
    mtaux_t *mt = j->fp->mt;
1326
0
    pthread_mutex_lock(&mt->job_pool_m);
1327
0
    pool_free(mt->job_pool, j);
1328
0
    pthread_mutex_unlock(&mt->job_pool_m);
1329
0
}
1330
1331
0
static void *bgzf_encode_func(void *arg) {
1332
0
    bgzf_job *j = (bgzf_job *)arg;
1333
1334
0
    j->comp_len = BGZF_MAX_BLOCK_SIZE;
1335
0
    int ret = bgzf_compress(j->comp_data, &j->comp_len,
1336
0
                            j->uncomp_data, j->uncomp_len,
1337
0
                            j->fp->compress_level);
1338
0
    if (ret != 0)
1339
0
        j->errcode |= BGZF_ERR_ZLIB;
1340
1341
0
    return arg;
1342
0
}
1343
1344
// Optimisation for compression level 0 (uncompressed deflate blocks)
1345
// Avoids memcpy of the data from uncompressed to compressed buffer.
1346
0
static void *bgzf_encode_level0_func(void *arg) {
1347
0
    bgzf_job *j = (bgzf_job *)arg;
1348
0
    uint32_t crc;
1349
0
    j->comp_len = j->uncomp_len + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH + 5;
1350
1351
    // Data will have already been copied in to
1352
    // j->comp_data + BLOCK_HEADER_LENGTH + 5
1353
1354
    // Add preamble
1355
0
    memcpy(j->comp_data, g_magic, BLOCK_HEADER_LENGTH);
1356
0
    u16_to_le(j->comp_len-1, j->comp_data + 16);
1357
1358
    // Deflate uncompressed data header
1359
0
    j->comp_data[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
1360
0
    u16_to_le(j->uncomp_len, j->comp_data + BLOCK_HEADER_LENGTH + 1);
1361
0
    u16_to_le(~j->uncomp_len, j->comp_data + BLOCK_HEADER_LENGTH + 3);
1362
1363
    // Trailer (CRC, uncompressed length)
1364
0
    crc = hts_crc32(0, j->comp_data + BLOCK_HEADER_LENGTH + 5, j->uncomp_len);
1365
0
    u32_to_le(crc, j->comp_data +  j->comp_len - 8);
1366
0
    u32_to_le(j->uncomp_len, j->comp_data + j->comp_len - 4);
1367
1368
0
    return arg;
1369
0
}
1370
1371
// Our input block has already been decoded by bgzf_mt_read_block().
1372
// We need to split that into a fetch block (compressed) and make this
1373
// do the actual decompression step.
1374
0
static void *bgzf_decode_func(void *arg) {
1375
0
    bgzf_job *j = (bgzf_job *)arg;
1376
1377
0
    j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1378
0
    uint32_t crc = le_to_u32((uint8_t *)j->comp_data + j->comp_len-8);
1379
0
    int ret = bgzf_uncompress(j->uncomp_data, &j->uncomp_len,
1380
0
                              j->comp_data+18, j->comp_len-18, crc);
1381
0
    if (ret != 0)
1382
0
        j->errcode |= BGZF_ERR_ZLIB;
1383
1384
0
    return arg;
1385
0
}
1386
1387
/*
1388
 * Nul function so we can dispatch a job with the correct serial
1389
 * to mark failure or to indicate an empty read (EOF).
1390
 */
1391
0
static void *bgzf_nul_func(void *arg) { return arg; }
1392
1393
/*
1394
 * Takes compressed blocks off the results queue and calls hwrite to
1395
 * punt them to the output stream.
1396
 *
1397
 * Returns NULL when no more are left, or -1 on error
1398
 */
1399
0
static void *bgzf_mt_writer(void *vp) {
1400
0
    BGZF *fp = (BGZF *)vp;
1401
0
    mtaux_t *mt = fp->mt;
1402
0
    hts_tpool_result *r;
1403
1404
0
    if (fp->idx_build_otf) {
1405
0
        fp->idx->moffs = fp->idx->noffs = 1;
1406
0
        fp->idx->offs = (bgzidx1_t*) calloc(fp->idx->moffs, sizeof(bgzidx1_t));
1407
0
        if (!fp->idx->offs) goto err;
1408
0
    }
1409
1410
    // Iterates until result queue is shutdown, where it returns NULL.
1411
0
    while ((r = hts_tpool_next_result_wait(mt->out_queue))) {
1412
0
        bgzf_job *j = (bgzf_job *)hts_tpool_result_data(r);
1413
0
        assert(j);
1414
1415
0
        if (fp->idx_build_otf) {
1416
0
            fp->idx->noffs++;
1417
0
            if ( fp->idx->noffs > fp->idx->moffs )
1418
0
            {
1419
0
                fp->idx->moffs = fp->idx->noffs;
1420
0
                kroundup32(fp->idx->moffs);
1421
0
                fp->idx->offs = hts_realloc_p(fp->idx->offs, sizeof(bgzidx1_t),
1422
0
                                              fp->idx->moffs);
1423
0
                if ( !fp->idx->offs ) goto err;
1424
0
            }
1425
0
            fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->offs[ fp->idx->noffs-2 ].uaddr + j->uncomp_len;
1426
0
            fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->idx->offs[ fp->idx->noffs-2 ].caddr + j->comp_len;
1427
0
        }
1428
1429
        // Flush any cached hts_idx_push calls
1430
0
        if (bgzf_idx_flush(fp, j->uncomp_len, j->comp_len) < 0)
1431
0
            goto err;
1432
1433
0
        if (hwrite(fp->fp, j->comp_data, j->comp_len) != j->comp_len)
1434
0
            goto err;
1435
1436
        // Update our local block_address.  Cannot be fp->block_address due to no
1437
        // locking in bgzf_tell.
1438
0
        pthread_mutex_lock(&mt->idx_m);
1439
0
        mt->block_address += j->comp_len;
1440
0
        pthread_mutex_unlock(&mt->idx_m);
1441
1442
        /*
1443
         * Periodically call hflush (which calls fsync when on a file).
1444
         * This avoids the fsync being done at the bgzf_close stage,
1445
         * which can sometimes cause significant delays.  As this is in
1446
         * a separate thread, spreading the sync delays throughout the
1447
         * program execution seems better.
1448
         * Frequency of 1/512 has been chosen by experimentation
1449
         * across local XFS, NFS and Lustre tests.
1450
         */
1451
0
        if (++mt->flush_pending % 512 == 0)
1452
0
            if (hflush(fp->fp) != 0)
1453
0
                goto err;
1454
1455
1456
0
        hts_tpool_delete_result(r, 0);
1457
1458
        // Also updated by main thread
1459
0
        pthread_mutex_lock(&mt->job_pool_m);
1460
0
        pool_free(mt->job_pool, j);
1461
0
        mt->jobs_pending--;
1462
0
        pthread_mutex_unlock(&mt->job_pool_m);
1463
0
    }
1464
1465
0
    if (hflush(fp->fp) != 0)
1466
0
        goto err;
1467
1468
0
    hts_tpool_process_destroy(mt->out_queue);
1469
1470
0
    return NULL;
1471
1472
0
 err:
1473
0
    hts_tpool_process_destroy(mt->out_queue);
1474
0
    return (void *)-1;
1475
0
}
1476
1477
1478
/*
1479
 * Reads a compressed block of data using hread and dispatches it to
1480
 * the thread pool for decompression.  This is the analogue of the old
1481
 * non-threaded bgzf_read_block() function, but without modifying fp
1482
 * in any way (except for the read offset).  All output goes via the
1483
 * supplied bgzf_job struct.
1484
 *
1485
 * Returns NULL when no more are left, or -1 on error
1486
 */
1487
int bgzf_mt_read_block(BGZF *fp, bgzf_job *j)
1488
0
{
1489
0
    uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
1490
0
    int count, block_length, remaining;
1491
1492
    // NOTE: Guaranteed to be compressed as we block multi-threading in
1493
    // uncompressed mode.  However it may be gzip compression instead
1494
    // of bgzf.
1495
1496
    // Reading compressed file
1497
0
    int64_t block_address;
1498
0
    block_address = htell(fp->fp);
1499
1500
0
    j->block_address = block_address;  // in case we exit with j->errcode
1501
1502
0
    if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
1503
0
    count = hpeek(fp->fp, header, sizeof(header));
1504
0
    if (count == 0) // no data read
1505
0
        return -1;
1506
0
    int ret;
1507
0
    if ( count != sizeof(header) || (ret=check_header(header))==-2 )
1508
0
    {
1509
0
        j->errcode |= BGZF_ERR_HEADER;
1510
0
        return -1;
1511
0
    }
1512
0
    if (ret == -1) {
1513
0
        j->errcode |= BGZF_ERR_MT;
1514
0
        return -1;
1515
0
    }
1516
1517
0
    count = hread(fp->fp, header, sizeof(header));
1518
0
    if (count != sizeof(header)) // no data read
1519
0
        return -1;
1520
1521
0
    block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1522
0
    if (block_length < BLOCK_HEADER_LENGTH) {
1523
0
        j->errcode |= BGZF_ERR_HEADER;
1524
0
        return -1;
1525
0
    }
1526
0
    compressed_block = (uint8_t*)j->comp_data;
1527
0
    memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1528
0
    remaining = block_length - BLOCK_HEADER_LENGTH;
1529
0
    count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1530
0
    if (count != remaining) {
1531
0
        j->errcode |= BGZF_ERR_IO;
1532
0
        return -1;
1533
0
    }
1534
0
    j->comp_len = block_length;
1535
0
    j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1536
0
    j->block_address = block_address;
1537
0
    j->fp = fp;
1538
0
    j->errcode = 0;
1539
1540
0
    return 0;
1541
0
}
1542
1543
1544
static int bgzf_check_EOF_common(BGZF *fp)
1545
2.13k
{
1546
2.13k
    uint8_t buf[28];
1547
2.13k
    off_t offset = htell(fp->fp);
1548
2.13k
    if (hseek(fp->fp, -28, SEEK_END) < 0) {
1549
432
        if (errno == ESPIPE) { hclearerr(fp->fp); return 2; }
1550
#ifdef _WIN32
1551
        if (errno == EINVAL) { hclearerr(fp->fp); return 2; }
1552
#else
1553
        // Assume that EINVAL was due to the file being less than 28 bytes
1554
        // long, rather than being a random error return from an hfile backend.
1555
        // This should be reported as "no EOF block" rather than an error.
1556
432
        if (errno == EINVAL) { hclearerr(fp->fp); return 0; }
1557
0
#endif
1558
0
        return -1;
1559
432
    }
1560
1.70k
    if ( hread(fp->fp, buf, 28) != 28 ) return -1;
1561
1.70k
    if ( hseek(fp->fp, offset, SEEK_SET) < 0 ) return -1;
1562
1.70k
    return (memcmp("\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", buf, 28) == 0)? 1 : 0;
1563
1.70k
}
1564
1565
/*
1566
 * Checks EOF from the reader thread.
1567
 */
1568
0
static void bgzf_mt_eof(BGZF *fp) {
1569
0
    mtaux_t *mt = fp->mt;
1570
1571
0
    pthread_mutex_lock(&mt->job_pool_m);
1572
0
    mt->eof = bgzf_check_EOF_common(fp);
1573
0
    pthread_mutex_unlock(&mt->job_pool_m);
1574
0
    mt->command = HAS_EOF_DONE;
1575
0
    pthread_cond_signal(&mt->command_c);
1576
0
}
1577
1578
1579
/*
1580
 * Performs the seek (called by reader thread).
1581
 *
1582
 * This simply drains the entire queue, throwing away blocks, seeks,
1583
 * and starts it up again.  Brute force, but maybe sufficient.
1584
 */
1585
0
static void bgzf_mt_seek(BGZF *fp) {
1586
0
    mtaux_t *mt = fp->mt;
1587
1588
0
    hts_tpool_process_reset(mt->out_queue, 0);
1589
0
    pthread_mutex_lock(&mt->job_pool_m);
1590
0
    mt->errcode = 0;
1591
1592
0
    if (hseek(fp->fp, mt->block_address, SEEK_SET) < 0)
1593
0
        mt->errcode = errno;
1594
1595
0
    pthread_mutex_unlock(&mt->job_pool_m);
1596
0
    mt->command = SEEK_DONE;
1597
0
    pthread_cond_signal(&mt->command_c);
1598
0
}
1599
1600
0
static void *bgzf_mt_reader(void *vp) {
1601
0
    BGZF *fp = (BGZF *)vp;
1602
0
    mtaux_t *mt = fp->mt;
1603
1604
0
restart:
1605
0
    pthread_mutex_lock(&mt->job_pool_m);
1606
0
    bgzf_job *j = pool_alloc(mt->job_pool);
1607
0
    pthread_mutex_unlock(&mt->job_pool_m);
1608
0
    if (!j) goto err;
1609
0
    j->errcode = 0;
1610
0
    j->comp_len = 0;
1611
0
    j->uncomp_len = 0;
1612
0
    j->hit_eof = 0;
1613
0
    j->fp = fp;
1614
1615
0
    while (bgzf_mt_read_block(fp, j) == 0) {
1616
        // Dispatch
1617
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_decode_func, j,
1618
0
                                job_cleanup, job_cleanup, 0) < 0) {
1619
0
            job_cleanup(j);
1620
0
            goto err;
1621
0
        }
1622
1623
        // Check for command
1624
0
        pthread_mutex_lock(&mt->command_m);
1625
0
        switch (mt->command) {
1626
0
        case SEEK:
1627
0
            bgzf_mt_seek(fp);  // Sets mt->command to SEEK_DONE
1628
0
            pthread_mutex_unlock(&mt->command_m);
1629
0
            goto restart;
1630
1631
0
        case HAS_EOF:
1632
0
            bgzf_mt_eof(fp);   // Sets mt->command to HAS_EOF_DONE
1633
0
            break;
1634
1635
0
        case SEEK_DONE:
1636
0
        case HAS_EOF_DONE:
1637
0
            pthread_cond_signal(&mt->command_c);
1638
0
            break;
1639
1640
0
        case CLOSE:
1641
0
            pthread_cond_signal(&mt->command_c);
1642
0
            pthread_mutex_unlock(&mt->command_m);
1643
0
            hts_tpool_process_destroy(mt->out_queue);
1644
0
            return NULL;
1645
1646
0
        default:
1647
0
            break;
1648
0
        }
1649
0
        pthread_mutex_unlock(&mt->command_m);
1650
1651
        // Allocate buffer for next block
1652
0
        pthread_mutex_lock(&mt->job_pool_m);
1653
0
        j = pool_alloc(mt->job_pool);
1654
0
        pthread_mutex_unlock(&mt->job_pool_m);
1655
0
        if (!j) {
1656
0
            hts_tpool_process_destroy(mt->out_queue);
1657
0
            return NULL;
1658
0
        }
1659
0
        j->errcode = 0;
1660
0
        j->comp_len = 0;
1661
0
        j->uncomp_len = 0;
1662
0
        j->hit_eof = 0;
1663
0
        j->fp = fp;
1664
0
    }
1665
1666
0
    if (j->errcode == BGZF_ERR_MT) {
1667
        // Attempt to multi-thread decode a raw gzip stream cannot be done.
1668
        // We tear down the multi-threaded decoder and revert to the old code.
1669
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_nul_func, j,
1670
0
                                job_cleanup, job_cleanup, 0) < 0) {
1671
0
            job_cleanup(j);
1672
0
            hts_tpool_process_destroy(mt->out_queue);
1673
0
            return NULL;
1674
0
        }
1675
0
        hts_tpool_process_ref_decr(mt->out_queue);
1676
0
        return &j->errcode;
1677
0
    }
1678
1679
    // Dispatch an empty block so EOF is spotted.
1680
    // We also use this mechanism for returning errors, in which case
1681
    // j->errcode is set already.
1682
1683
0
    j->hit_eof = 1;
1684
0
    if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_nul_func, j,
1685
0
                            job_cleanup, job_cleanup, 0) < 0) {
1686
0
        job_cleanup(j);
1687
0
        hts_tpool_process_destroy(mt->out_queue);
1688
0
        return NULL;
1689
0
    }
1690
0
    if (j->errcode != 0) {
1691
0
        hts_tpool_process_destroy(mt->out_queue);
1692
0
        return &j->errcode;
1693
0
    }
1694
1695
    // We hit EOF so can stop reading, but we may get a subsequent
1696
    // seek request.  In this case we need to restart the reader.
1697
    //
1698
    // To handle this we wait on a condition variable and then
1699
    // monitor the command. (This could be either seek or close.)
1700
0
    for (;;) {
1701
0
        pthread_mutex_lock(&mt->command_m);
1702
0
        if (mt->command == NONE)
1703
0
            pthread_cond_wait(&mt->command_c, &mt->command_m);
1704
0
        switch(mt->command) {
1705
0
        default:
1706
0
            pthread_mutex_unlock(&mt->command_m);
1707
0
            break;
1708
1709
0
        case SEEK:
1710
0
            bgzf_mt_seek(fp);
1711
0
            pthread_mutex_unlock(&mt->command_m);
1712
0
            goto restart;
1713
1714
0
        case HAS_EOF:
1715
0
            bgzf_mt_eof(fp);   // Sets mt->command to HAS_EOF_DONE
1716
0
            pthread_mutex_unlock(&mt->command_m);
1717
0
            break;
1718
1719
0
        case SEEK_DONE:
1720
0
        case HAS_EOF_DONE:
1721
0
            pthread_cond_signal(&mt->command_c);
1722
0
            pthread_mutex_unlock(&mt->command_m);
1723
0
            break;
1724
1725
0
        case CLOSE:
1726
0
            pthread_cond_signal(&mt->command_c);
1727
0
            pthread_mutex_unlock(&mt->command_m);
1728
0
            hts_tpool_process_destroy(mt->out_queue);
1729
0
            return NULL;
1730
0
        }
1731
0
    }
1732
1733
0
 err:
1734
0
    pthread_mutex_lock(&mt->command_m);
1735
0
    mt->command = CLOSE;
1736
0
    pthread_cond_signal(&mt->command_c);
1737
0
    pthread_mutex_unlock(&mt->command_m);
1738
0
    hts_tpool_process_destroy(mt->out_queue);
1739
0
    return NULL;
1740
0
}
1741
1742
0
int bgzf_thread_pool(BGZF *fp, hts_tpool *pool, int qsize) {
1743
    // No gain from multi-threading when not compressed
1744
0
    if (!fp->is_compressed)
1745
0
        return 0;
1746
0
    if (fp->mt)
1747
0
        return -2;  //already exists!
1748
1749
0
    mtaux_t *mt;
1750
0
    mt = (mtaux_t*)calloc(1, sizeof(mtaux_t));
1751
0
    if (!mt) return -1;
1752
0
    fp->mt = mt;
1753
1754
0
    mt->pool = pool;
1755
0
    mt->n_threads = hts_tpool_size(pool);
1756
0
    if (!qsize)
1757
0
        qsize = mt->n_threads*2;
1758
0
    if (!(mt->out_queue = hts_tpool_process_init(mt->pool, qsize, 0)))
1759
0
        goto err;
1760
0
    hts_tpool_process_ref_incr(mt->out_queue);
1761
1762
0
    mt->job_pool = pool_create(sizeof(bgzf_job));
1763
0
    if (!mt->job_pool)
1764
0
        goto err;
1765
1766
0
    pthread_mutex_init(&mt->job_pool_m, NULL);
1767
0
    pthread_mutex_init(&mt->command_m, NULL);
1768
0
    pthread_mutex_init(&mt->idx_m, NULL);
1769
0
    pthread_cond_init(&mt->command_c, NULL);
1770
0
    mt->flush_pending = 0;
1771
0
    mt->jobs_pending = 0;
1772
0
    mt->free_block = fp->uncompressed_block; // currently in-use block
1773
0
    mt->block_address = fp->block_address;
1774
0
    pthread_create(&mt->io_task, NULL,
1775
0
                   fp->is_write ? bgzf_mt_writer : bgzf_mt_reader, fp);
1776
1777
0
    return 0;
1778
1779
0
 err:
1780
0
    free(mt);
1781
0
    fp->mt = NULL;
1782
0
    return -1;
1783
0
}
1784
1785
int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1786
0
{
1787
    // No gain from multi-threading when not compressed
1788
0
    if (!fp->is_compressed || fp->is_gzip)
1789
0
        return 0;
1790
1791
0
    if (n_threads < 1) return -1;
1792
0
    hts_tpool *p = hts_tpool_init(n_threads);
1793
0
    if (!p)
1794
0
        return -1;
1795
1796
0
    int ret = 0;
1797
0
    if ((ret = bgzf_thread_pool(fp, p, 0)) < 0) {
1798
0
        hts_tpool_destroy(p);
1799
0
        return ret;
1800
0
    }
1801
1802
0
    fp->mt->own_pool = 1;
1803
1804
0
    return 0;
1805
0
}
1806
1807
static int mt_destroy(mtaux_t *mt)
1808
0
{
1809
0
    int ret = 0;
1810
1811
    // Tell the reader to shut down
1812
0
    pthread_mutex_lock(&mt->command_m);
1813
0
    mt->command = CLOSE;
1814
0
    pthread_cond_signal(&mt->command_c);
1815
0
    hts_tpool_wake_dispatch(mt->out_queue); // unstick the reader
1816
0
    pthread_mutex_unlock(&mt->command_m);
1817
1818
    // Check for thread worker failure, indicated by is_shutdown returning 2
1819
    // It's possible really late errors might be missed, but we can live with
1820
    // that.
1821
0
    ret = -(hts_tpool_process_is_shutdown(mt->out_queue) > 1);
1822
    // Destroying the queue first forces the writer to exit.
1823
    // mt->out_queue is reference counted, so destroy gets called in both
1824
    // this and the IO threads.  The last to do it will clean up.
1825
0
    hts_tpool_process_destroy(mt->out_queue);
1826
1827
    // IO thread will now exit.  Wait for it and perform final clean-up.
1828
    // If it returned non-NULL, it was not happy.
1829
0
    void *retval = NULL;
1830
0
    pthread_join(mt->io_task, &retval);
1831
0
    ret = retval != NULL ? -1 : ret;
1832
1833
0
    pthread_mutex_destroy(&mt->job_pool_m);
1834
0
    pthread_mutex_destroy(&mt->command_m);
1835
0
    pthread_mutex_destroy(&mt->idx_m);
1836
0
    pthread_cond_destroy(&mt->command_c);
1837
0
    if (mt->curr_job)
1838
0
        pool_free(mt->job_pool, mt->curr_job);
1839
1840
0
    if (mt->own_pool)
1841
0
        hts_tpool_destroy(mt->pool);
1842
1843
0
    pool_destroy(mt->job_pool);
1844
1845
0
    if (mt->idx_cache.e)
1846
0
        free(mt->idx_cache.e);
1847
1848
0
    free(mt);
1849
0
    fflush(stderr);
1850
1851
0
    return ret;
1852
0
}
1853
1854
static int mt_queue(BGZF *fp)
1855
0
{
1856
0
    mtaux_t *mt = fp->mt;
1857
1858
0
    mt->block_number++;
1859
1860
    // Also updated by writer thread
1861
0
    pthread_mutex_lock(&mt->job_pool_m);
1862
0
    bgzf_job *j = pool_alloc(mt->job_pool);
1863
0
    if (j) mt->jobs_pending++;
1864
0
    pthread_mutex_unlock(&mt->job_pool_m);
1865
0
    if (!j) return -1;
1866
1867
0
    j->fp = fp;
1868
0
    j->errcode = 0;
1869
0
    j->uncomp_len  = fp->block_offset;
1870
0
    if (fp->compress_level == 0) {
1871
0
        memcpy(j->comp_data + BLOCK_HEADER_LENGTH + 5, fp->uncompressed_block,
1872
0
               j->uncomp_len);
1873
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue,
1874
0
                                bgzf_encode_level0_func, j,
1875
0
                                job_cleanup, job_cleanup, 0) < 0) {
1876
0
            goto fail;
1877
0
        }
1878
0
    } else {
1879
0
        memcpy(j->uncomp_data, fp->uncompressed_block, j->uncomp_len);
1880
1881
        // Need non-block vers & job_pending?
1882
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_encode_func, j,
1883
0
                                job_cleanup, job_cleanup, 0) < 0) {
1884
0
            goto fail;
1885
0
        }
1886
0
    }
1887
1888
0
    fp->block_offset = 0;
1889
0
    return 0;
1890
1891
0
 fail:
1892
0
    job_cleanup(j);
1893
0
    pthread_mutex_lock(&mt->job_pool_m);
1894
0
    mt->jobs_pending--;
1895
0
    pthread_mutex_unlock(&mt->job_pool_m);
1896
0
    return -1;
1897
0
}
1898
1899
static int mt_flush_queue(BGZF *fp)
1900
0
{
1901
0
    mtaux_t *mt = fp->mt;
1902
1903
    // Drain the encoder jobs.
1904
    // We cannot use hts_tpool_flush here as it can cause deadlock if
1905
    // the queue is full up of decoder tasks.  The best solution would
1906
    // be to have one input queue per type of job, but we don't right now.
1907
    //hts_tpool_flush(mt->pool);
1908
0
    pthread_mutex_lock(&mt->job_pool_m);
1909
0
    int shutdown = 0;
1910
0
    while (mt->jobs_pending != 0) {
1911
0
        if ((shutdown = hts_tpool_process_is_shutdown(mt->out_queue)))
1912
0
            break;
1913
0
        pthread_mutex_unlock(&mt->job_pool_m);
1914
0
        hts_usleep(10000); // FIXME: replace by condition variable
1915
0
        pthread_mutex_lock(&mt->job_pool_m);
1916
0
    }
1917
0
    pthread_mutex_unlock(&mt->job_pool_m);
1918
1919
0
    if (shutdown)
1920
0
        return -1;
1921
1922
    // Wait on bgzf_mt_writer to drain the queue
1923
0
    if (hts_tpool_process_flush(mt->out_queue) != 0)
1924
0
        return -1;
1925
1926
0
    return (fp->errcode == 0)? 0 : -1;
1927
0
}
1928
1929
static int lazy_flush(BGZF *fp)
1930
11.1k
{
1931
11.1k
    if (fp->mt)
1932
0
        return fp->block_offset ? mt_queue(fp) : 0;
1933
11.1k
    else
1934
11.1k
        return bgzf_flush(fp);
1935
11.1k
}
1936
1937
#else  // ~ #ifdef BGZF_MT
1938
1939
int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1940
{
1941
    return 0;
1942
}
1943
1944
static inline int lazy_flush(BGZF *fp)
1945
{
1946
    return bgzf_flush(fp);
1947
}
1948
1949
#endif // ~ #ifdef BGZF_MT
1950
1951
int bgzf_flush(BGZF *fp)
1952
31.2k
{
1953
31.2k
    if (!fp->is_write) return 0;
1954
31.2k
#ifdef BGZF_MT
1955
31.2k
    if (fp->mt) {
1956
0
        int ret = 0;
1957
0
        if (fp->block_offset) ret = mt_queue(fp);
1958
0
        if (!ret) ret = mt_flush_queue(fp);
1959
1960
        // We maintain mt->block_address when threading as the
1961
        // main code can call bgzf_tell without any locks.
1962
        // (The result from tell are wrong, but we only care about the last
1963
        // 16-bits worth except for the final flush process.
1964
0
        pthread_mutex_lock(&fp->mt->idx_m);
1965
0
        fp->block_address = fp->mt->block_address;
1966
0
        pthread_mutex_unlock(&fp->mt->idx_m);
1967
1968
0
        return ret;
1969
0
    }
1970
31.2k
#endif
1971
1972
31.2k
    if (!fp->is_compressed) {
1973
0
        return hflush(fp->fp);
1974
0
    }
1975
1976
52.9k
    while (fp->block_offset > 0) {
1977
21.6k
        int block_length;
1978
21.6k
        if ( fp->idx_build_otf )
1979
0
        {
1980
0
            bgzf_index_add_block(fp);
1981
0
            fp->idx->ublock_addr += fp->block_offset;
1982
0
        }
1983
21.6k
        block_length = deflate_block(fp, fp->block_offset);
1984
21.6k
        if (block_length < 0) {
1985
0
            hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length, NULL));
1986
0
            return -1;
1987
0
        }
1988
21.6k
        if (hwrite(fp->fp, fp->compressed_block, block_length) != block_length) {
1989
0
            hts_log_error("File write failed (wrong size)");
1990
0
            fp->errcode |= BGZF_ERR_IO; // possibly truncated file
1991
0
            return -1;
1992
0
        }
1993
21.6k
        fp->block_address += block_length;
1994
21.6k
    }
1995
31.2k
    return 0;
1996
31.2k
}
1997
1998
int bgzf_flush_try(BGZF *fp, ssize_t size)
1999
5.50M
{
2000
5.50M
    if (fp->block_offset + size > BGZF_BLOCK_SIZE) return lazy_flush(fp);
2001
5.50M
    return 0;
2002
5.50M
}
2003
2004
ssize_t bgzf_write(BGZF *fp, const void *data, size_t length)
2005
27.7k
{
2006
27.7k
    if ( !fp->is_compressed ) {
2007
0
        size_t push = length + (size_t) fp->block_offset;
2008
0
        fp->block_offset = push % BGZF_MAX_BLOCK_SIZE;
2009
0
        fp->block_address += (push - fp->block_offset);
2010
0
        return hwrite(fp->fp, data, length);
2011
0
    }
2012
2013
27.7k
    const uint8_t *input = (const uint8_t*)data;
2014
27.7k
    ssize_t remaining = length;
2015
27.7k
    assert(fp->is_write);
2016
62.7k
    while (remaining > 0) {
2017
35.0k
        uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
2018
35.0k
        int copy_length = BGZF_BLOCK_SIZE - fp->block_offset;
2019
35.0k
        if (copy_length > remaining) copy_length = remaining;
2020
35.0k
        memcpy(buffer + fp->block_offset, input, copy_length);
2021
35.0k
        fp->block_offset += copy_length;
2022
35.0k
        input += copy_length;
2023
35.0k
        remaining -= copy_length;
2024
35.0k
        if (fp->block_offset == BGZF_BLOCK_SIZE) {
2025
7.48k
            if (lazy_flush(fp) != 0) return -1;
2026
7.48k
        }
2027
35.0k
    }
2028
27.7k
    return length - remaining;
2029
27.7k
}
2030
2031
ssize_t bgzf_block_write(BGZF *fp, const void *data, size_t length)
2032
0
{
2033
0
    if ( !fp->is_compressed ) {
2034
0
        size_t push = length + (size_t) fp->block_offset;
2035
0
        fp->block_offset = push % BGZF_MAX_BLOCK_SIZE;
2036
0
        fp->block_address += (push - fp->block_offset);
2037
0
        return hwrite(fp->fp, data, length);
2038
0
    }
2039
2040
0
    const uint8_t *input = (const uint8_t*)data;
2041
0
    ssize_t remaining = length;
2042
0
    assert(fp->is_write);
2043
0
    uint64_t current_block; //keep track of current block
2044
0
    uint64_t ublock_size; // amount of uncompressed data to be fed into next block
2045
0
    while (remaining > 0) {
2046
0
        current_block = fp->idx->moffs - fp->idx->noffs;
2047
0
        ublock_size = current_block + 1 < fp->idx->moffs ? fp->idx->offs[current_block+1].uaddr-fp->idx->offs[current_block].uaddr : BGZF_MAX_BLOCK_SIZE;
2048
0
        uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
2049
0
        int copy_length = ublock_size - fp->block_offset;
2050
0
        if (copy_length > remaining) copy_length = remaining;
2051
0
        memcpy(buffer + fp->block_offset, input, copy_length);
2052
0
        fp->block_offset += copy_length;
2053
0
        input += copy_length;
2054
0
        remaining -= copy_length;
2055
0
        if (fp->block_offset == ublock_size) {
2056
0
            if (lazy_flush(fp) != 0) return -1;
2057
0
            if (fp->idx->noffs > 0)
2058
0
                fp->idx->noffs--;  // decrement noffs to track the blocks
2059
0
        }
2060
0
    }
2061
0
    return length - remaining;
2062
0
}
2063
2064
2065
ssize_t bgzf_raw_write(BGZF *fp, const void *data, size_t length)
2066
0
{
2067
0
    ssize_t ret = hwrite(fp->fp, data, length);
2068
0
    if (ret < 0) fp->errcode |= BGZF_ERR_IO;
2069
0
    return ret;
2070
0
}
2071
2072
// Helper function for tidying up fp->mt and setting errcode
2073
18.0k
static void bgzf_close_mt(BGZF *fp) {
2074
18.0k
    if (fp->mt) {
2075
0
        if (!fp->mt->free_block)
2076
0
            fp->uncompressed_block = NULL;
2077
0
        if (mt_destroy(fp->mt) < 0)
2078
0
            fp->errcode = BGZF_ERR_IO;
2079
0
    }
2080
18.0k
}
2081
2082
int bgzf_close(BGZF* fp)
2083
18.0k
{
2084
18.0k
    int ret, block_length;
2085
18.0k
    if (fp == 0) return -1;
2086
18.0k
    if (fp->is_write && fp->is_compressed) {
2087
12.1k
        if (bgzf_flush(fp) != 0) {
2088
0
            bgzf_close_mt(fp);
2089
0
            return -1;
2090
0
        }
2091
12.1k
        fp->compress_level = -1;
2092
12.1k
        block_length = deflate_block(fp, 0); // write an empty block
2093
12.1k
        if (block_length < 0) {
2094
0
            hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length, NULL));
2095
0
            bgzf_close_mt(fp);
2096
0
            return -1;
2097
0
        }
2098
12.1k
        if (hwrite(fp->fp, fp->compressed_block, block_length) < 0
2099
12.1k
            || hflush(fp->fp) != 0) {
2100
0
            hts_log_error("File write failed");
2101
0
            fp->errcode |= BGZF_ERR_IO;
2102
0
            return -1;
2103
0
        }
2104
12.1k
    }
2105
2106
18.0k
    bgzf_close_mt(fp);
2107
2108
18.0k
    if ( fp->is_gzip )
2109
1.96k
    {
2110
1.96k
        if (fp->gz_stream == NULL) ret = Z_OK;
2111
1.41k
        else if (!fp->is_write) ret = inflateEnd(fp->gz_stream);
2112
0
        else ret = deflateEnd(fp->gz_stream);
2113
1.96k
        if (ret != Z_OK) {
2114
0
            hts_log_error("Call to inflateEnd/deflateEnd failed: %s", bgzf_zerr(ret, NULL));
2115
0
        }
2116
1.96k
        free(fp->gz_stream);
2117
1.96k
    }
2118
18.0k
    ret = hclose(fp->fp);
2119
18.0k
    if (ret != 0) return -1;
2120
18.0k
    bgzf_index_destroy(fp);
2121
18.0k
    free(fp->uncompressed_block);
2122
18.0k
    free_cache(fp);
2123
18.0k
    ret = fp->errcode ? -1 : 0;
2124
18.0k
    free(fp);
2125
18.0k
    return ret;
2126
18.0k
}
2127
2128
void bgzf_set_cache_size(BGZF *fp, int cache_size)
2129
0
{
2130
0
    if (fp && fp->mt) return; // Not appropriate when multi-threading
2131
0
    if (fp && fp->cache) fp->cache_size = cache_size;
2132
0
}
2133
2134
2.13k
int bgzf_check_EOF(BGZF *fp) {
2135
2.13k
    int has_eof;
2136
2137
2.13k
    if (fp->mt) {
2138
0
        pthread_mutex_lock(&fp->mt->command_m);
2139
        // fp->mt->command state transitions should be:
2140
        // NONE -> HAS_EOF -> HAS_EOF_DONE -> NONE
2141
        // (HAS_EOF -> HAS_EOF_DONE happens in bgzf_mt_reader thread)
2142
0
        if (fp->mt->command != CLOSE)
2143
0
            fp->mt->command = HAS_EOF;
2144
0
        pthread_cond_signal(&fp->mt->command_c);
2145
0
        hts_tpool_wake_dispatch(fp->mt->out_queue);
2146
0
        do {
2147
0
            if (fp->mt->command == CLOSE) {
2148
                // possible error in bgzf_mt_reader
2149
0
                pthread_mutex_unlock(&fp->mt->command_m);
2150
0
                return 0;
2151
0
            }
2152
0
            pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
2153
0
            switch (fp->mt->command) {
2154
0
            case HAS_EOF_DONE: break;
2155
0
            case HAS_EOF:
2156
                // Resend signal intended for bgzf_mt_reader()
2157
0
                pthread_cond_signal(&fp->mt->command_c);
2158
0
                break;
2159
0
            case CLOSE:
2160
0
                continue;
2161
0
            default:
2162
0
                abort();  // Should not get to any other state
2163
0
            }
2164
0
        } while (fp->mt->command != HAS_EOF_DONE);
2165
0
        fp->mt->command = NONE;
2166
0
        has_eof = fp->mt->eof;
2167
0
        pthread_mutex_unlock(&fp->mt->command_m);
2168
2.13k
    } else {
2169
2.13k
        has_eof = bgzf_check_EOF_common(fp);
2170
2.13k
    }
2171
2172
2.13k
    fp->no_eof_block = (has_eof == 0);
2173
2174
2.13k
    return has_eof;
2175
2.13k
}
2176
2177
static inline int64_t bgzf_seek_common(BGZF* fp,
2178
                                       int64_t block_address, int block_offset)
2179
0
{
2180
0
    if (fp->mt) {
2181
        // The reader runs asynchronous and does loops of:
2182
        //    Read block
2183
        //    Check & process command
2184
        //    Dispatch decode job
2185
        //
2186
        // Once at EOF it then switches to loops of
2187
        //    Wait for command
2188
        //    Process command (possibly switching back to above loop).
2189
        //
2190
        // To seek we therefore send the reader thread a SEEK command,
2191
        // waking it up if blocked in dispatch and signalling if
2192
        // waiting for a command.  We then wait for the response so we
2193
        // know the seek succeeded.
2194
0
        pthread_mutex_lock(&fp->mt->command_m);
2195
0
        fp->mt->hit_eof = 0;
2196
        // fp->mt->command state transitions should be:
2197
        // NONE -> SEEK -> SEEK_DONE -> NONE
2198
        // (SEEK -> SEEK_DONE happens in bgzf_mt_reader thread)
2199
0
        fp->mt->command = SEEK;
2200
0
        fp->mt->block_address = block_address;
2201
0
        pthread_cond_signal(&fp->mt->command_c);
2202
0
        hts_tpool_wake_dispatch(fp->mt->out_queue);
2203
0
        do {
2204
0
            pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
2205
0
            switch (fp->mt->command) {
2206
0
            case SEEK_DONE: break;
2207
0
            case SEEK:
2208
                // Resend signal intended for bgzf_mt_reader()
2209
0
                pthread_cond_signal(&fp->mt->command_c);
2210
0
                break;
2211
0
            default:
2212
0
                abort();  // Should not get to any other state
2213
0
            }
2214
0
        } while (fp->mt->command != SEEK_DONE);
2215
2216
0
        fp->mt->command = NONE;
2217
2218
0
        if (fp->mt->errcode) {
2219
0
            fp->errcode |= BGZF_ERR_IO;
2220
0
            errno = fp->mt->errcode;
2221
0
            pthread_mutex_unlock(&fp->mt->command_m);
2222
0
            return -1;
2223
0
        }
2224
2225
0
        fp->block_length = 0;  // indicates current block has not been loaded
2226
0
        fp->block_address = block_address;
2227
0
        fp->block_offset = block_offset;
2228
2229
0
        pthread_mutex_unlock(&fp->mt->command_m);
2230
0
    } else {
2231
0
        if (hseek(fp->fp, block_address, SEEK_SET) < 0) {
2232
0
            fp->errcode |= BGZF_ERR_IO;
2233
0
            return -1;
2234
0
        }
2235
0
        fp->block_length = 0;  // indicates current block has not been loaded
2236
0
        fp->block_address = block_address;
2237
0
        fp->block_offset = block_offset;
2238
0
    }
2239
2240
0
    return 0;
2241
0
}
2242
2243
int64_t bgzf_seek(BGZF* fp, int64_t pos, int where)
2244
0
{
2245
0
    if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
2246
0
        fp->errcode |= BGZF_ERR_MISUSE;
2247
0
        return -1;
2248
0
    }
2249
2250
    // This is a flag to indicate we've jumped elsewhere in the stream, to act
2251
    // as a hint to any other code which is wrapping up bgzf for its own
2252
    // purposes.  We may not be able to tell when seek happens as it can be
2253
    // done on our behalf, eg by the iterator.
2254
    //
2255
    // This is never cleared here.  Any tool that needs to handle it is also
2256
    // responsible for clearing it.
2257
0
    fp->seeked = pos;
2258
2259
0
    return bgzf_seek_common(fp, pos >> 16, pos & 0xFFFF);
2260
0
}
2261
2262
int bgzf_is_bgzf(const char *fn)
2263
0
{
2264
0
    uint8_t buf[16];
2265
0
    int n;
2266
0
    hFILE *fp;
2267
0
    if ((fp = hopen(fn, "r")) == 0) return 0;
2268
0
    n = hread(fp, buf, 16);
2269
0
    if (hclose(fp) < 0) return 0;
2270
0
    if (n != 16) return 0;
2271
0
    return check_header(buf) == 0? 1 : 0;
2272
0
}
2273
2274
int bgzf_compression(BGZF *fp)
2275
0
{
2276
0
    return (!fp->is_compressed)? no_compression : (fp->is_gzip)? gzip : bgzf;
2277
0
}
2278
2279
int bgzf_getc(BGZF *fp)
2280
0
{
2281
0
    if (fp->block_offset+1 < fp->block_length) {
2282
0
        fp->uncompressed_address++;
2283
0
        return ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
2284
0
    }
2285
2286
0
    int c;
2287
0
    if (fp->block_offset >= fp->block_length) {
2288
0
        if (bgzf_read_block(fp) != 0) return -2; /* error */
2289
0
        if (fp->block_length == 0) return -1; /* end-of-file */
2290
0
    }
2291
0
    c = ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
2292
0
    if (fp->block_offset == fp->block_length) {
2293
0
        fp->block_address = bgzf_htell(fp);
2294
0
        fp->block_offset = 0;
2295
0
        fp->block_length = 0;
2296
0
    }
2297
0
    fp->uncompressed_address++;
2298
0
    return c;
2299
0
}
2300
2301
int bgzf_getline(BGZF *fp, int delim, kstring_t *str)
2302
69.9M
{
2303
69.9M
    int l, state = 0;
2304
69.9M
    str->l = 0;
2305
69.9M
    do {
2306
69.9M
        if (fp->block_offset >= fp->block_length) {
2307
45.6k
            if (bgzf_read_block(fp) != 0) { state = -2; break; }
2308
44.7k
            if (fp->block_length == 0) { state = -1; break; }
2309
44.7k
        }
2310
69.9M
        unsigned char *buf = fp->uncompressed_block;
2311
2312
        // Equivalent to a naive byte by byte search from
2313
        // buf + block_offset to buf + block_length.
2314
69.9M
        void *e = memchr(&buf[fp->block_offset], delim,
2315
69.9M
                         fp->block_length - fp->block_offset);
2316
69.9M
        l = e ? (unsigned char *)e - buf : fp->block_length;
2317
2318
69.9M
        if (l < fp->block_length) state = 1;
2319
69.9M
        l -= fp->block_offset;
2320
69.9M
        if (ks_expand(str, l + 2) < 0) { state = -3; break; }
2321
69.9M
        memcpy(str->s + str->l, buf + fp->block_offset, l);
2322
69.9M
        str->l += l;
2323
69.9M
        fp->block_offset += l + 1;
2324
69.9M
        if (fp->block_offset >= fp->block_length) {
2325
44.1k
            fp->block_address = bgzf_htell(fp);
2326
44.1k
            fp->block_offset = 0;
2327
44.1k
            fp->block_length = 0;
2328
44.1k
        }
2329
69.9M
    } while (state == 0);
2330
69.9M
    if (state < -1) return state;
2331
69.9M
    if (str->l == 0 && state < 0) return state;
2332
69.9M
    fp->uncompressed_address += str->l + 1;
2333
69.9M
    if ( delim=='\n' && str->l>0 && str->s[str->l-1]=='\r' ) str->l--;
2334
69.9M
    str->s[str->l] = 0;
2335
69.9M
    return str->l <= INT_MAX ? (int) str->l : INT_MAX;
2336
69.9M
}
2337
2338
void bgzf_index_destroy(BGZF *fp)
2339
18.0k
{
2340
18.0k
    if ( !fp->idx ) return;
2341
0
    free(fp->idx->offs);
2342
0
    free(fp->idx);
2343
0
    fp->idx = NULL;
2344
0
    fp->idx_build_otf = 0;
2345
0
}
2346
2347
int bgzf_index_build_init(BGZF *fp)
2348
0
{
2349
0
    bgzf_index_destroy(fp);
2350
0
    fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
2351
0
    if ( !fp->idx ) return -1;
2352
0
    fp->idx_build_otf = 1;  // build index on the fly
2353
0
    return 0;
2354
0
}
2355
2356
int bgzf_index_add_block(BGZF *fp)
2357
0
{
2358
0
    fp->idx->noffs++;
2359
0
    if ( fp->idx->noffs > fp->idx->moffs )
2360
0
    {
2361
0
        fp->idx->moffs = fp->idx->noffs;
2362
0
        kroundup32(fp->idx->moffs);
2363
0
        fp->idx->offs = hts_realloc_p(fp->idx->offs, sizeof(bgzidx1_t),
2364
0
                                      fp->idx->moffs);
2365
0
        if ( !fp->idx->offs ) return -1;
2366
0
    }
2367
0
    fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->ublock_addr;
2368
0
    fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->block_address;
2369
0
    return 0;
2370
0
}
2371
2372
static inline int hwrite_uint64(uint64_t x, hFILE *f)
2373
0
{
2374
0
    if (ed_is_big()) x = ed_swap_8(x);
2375
0
    if (hwrite(f, &x, sizeof(x)) != sizeof(x)) return -1;
2376
0
    return 0;
2377
0
}
2378
2379
static char * get_name_suffix(const char *bname, const char *suffix)
2380
0
{
2381
0
    size_t len = strlen(bname) + strlen(suffix) + 1;
2382
0
    char *buff = malloc(len);
2383
0
    if (!buff) return NULL;
2384
0
    snprintf(buff, len, "%s%s", bname, suffix);
2385
0
    return buff;
2386
0
}
2387
2388
int bgzf_index_dump_hfile(BGZF *fp, struct hFILE *idx, const char *name)
2389
0
{
2390
    // Note that the index contains one extra record when indexing files opened
2391
    // for reading. The terminating record is not present when opened for writing.
2392
    // This is not a bug.
2393
2394
0
    int i;
2395
2396
0
    if (!fp->idx) {
2397
0
        hts_log_error("Called for BGZF handle with no index");
2398
0
        errno = EINVAL;
2399
0
        return -1;
2400
0
    }
2401
2402
0
    if (bgzf_flush(fp) != 0) return -1;
2403
2404
    // discard the entry marking the end of the file
2405
0
    if (fp->mt && fp->idx && fp->idx->noffs > 0)
2406
0
        fp->idx->noffs--;
2407
2408
0
    if (hwrite_uint64(fp->idx->noffs > 0 ? fp->idx->noffs - 1 : 0, idx) < 0)
2409
0
        goto fail;
2410
0
    for (i=1; i<fp->idx->noffs; i++)
2411
0
    {
2412
0
        if (hwrite_uint64(fp->idx->offs[i].caddr, idx) < 0) goto fail;
2413
0
        if (hwrite_uint64(fp->idx->offs[i].uaddr, idx) < 0) goto fail;
2414
0
    }
2415
0
    return 0;
2416
2417
0
 fail:
2418
0
    hts_log_error("Error writing to %s : %s", name ? name : "index", strerror(errno));
2419
0
    return -1;
2420
0
}
2421
2422
int bgzf_index_dump(BGZF *fp, const char *bname, const char *suffix)
2423
0
{
2424
0
    const char *name = bname, *msg = NULL;
2425
0
    char *tmp = NULL;
2426
0
    hFILE *idx = NULL;
2427
2428
0
    if (!fp->idx) {
2429
0
        hts_log_error("Called for BGZF handle with no index");
2430
0
        errno = EINVAL;
2431
0
        return -1;
2432
0
    }
2433
2434
0
    if ( suffix )
2435
0
    {
2436
0
        tmp = get_name_suffix(bname, suffix);
2437
0
        if ( !tmp ) return -1;
2438
0
        name = tmp;
2439
0
    }
2440
2441
0
    idx = hopen(name, "wb");
2442
0
    if ( !idx ) {
2443
0
        msg = "Error opening";
2444
0
        goto fail;
2445
0
    }
2446
2447
0
    if (bgzf_index_dump_hfile(fp, idx, name) != 0) goto fail;
2448
2449
0
    if (hclose(idx) < 0)
2450
0
    {
2451
0
        idx = NULL;
2452
0
        msg = "Error on closing";
2453
0
        goto fail;
2454
0
    }
2455
2456
0
    free(tmp);
2457
0
    return 0;
2458
2459
0
 fail:
2460
0
    if (msg != NULL) {
2461
0
        hts_log_error("%s %s : %s", msg, name, strerror(errno));
2462
0
    }
2463
0
    if (idx) hclose_abruptly(idx);
2464
0
    free(tmp);
2465
0
    return -1;
2466
0
}
2467
2468
static inline int hread_uint64(uint64_t *xptr, hFILE *f)
2469
0
{
2470
0
    if (hread(f, xptr, sizeof(*xptr)) != sizeof(*xptr)) return -1;
2471
0
    if (ed_is_big()) ed_swap_8p(xptr);
2472
0
    return 0;
2473
0
}
2474
2475
int bgzf_index_load_hfile(BGZF *fp, struct hFILE *idx, const char *name)
2476
0
{
2477
0
    fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
2478
0
    if (fp->idx == NULL) goto fail;
2479
0
    uint64_t x;
2480
0
    if (hread_uint64(&x, idx) < 0) goto fail;
2481
0
    if (x >= ((SIZE_MAX < UINT64_MAX ? SIZE_MAX : UINT64_MAX)
2482
0
              / sizeof(bgzidx1_t) / 2))
2483
0
        goto fail;
2484
2485
0
    fp->idx->noffs = fp->idx->moffs = x + 1;
2486
0
    fp->idx->offs  = (bgzidx1_t*) malloc(fp->idx->moffs*sizeof(bgzidx1_t));
2487
0
    if (fp->idx->offs == NULL) goto fail;
2488
0
    fp->idx->offs[0].caddr = fp->idx->offs[0].uaddr = 0;
2489
2490
0
    int i;
2491
0
    for (i=1; i<fp->idx->noffs; i++)
2492
0
    {
2493
0
        if (hread_uint64(&fp->idx->offs[i].caddr, idx) < 0) goto fail;
2494
0
        if (hread_uint64(&fp->idx->offs[i].uaddr, idx) < 0) goto fail;
2495
0
    }
2496
2497
0
    return 0;
2498
2499
0
 fail:
2500
0
    hts_log_error("Error reading %s : %s", name ? name : "index", strerror(errno));
2501
0
    if (fp->idx) {
2502
0
        free(fp->idx->offs);
2503
0
        free(fp->idx);
2504
0
        fp->idx = NULL;
2505
0
    }
2506
0
    return -1;
2507
0
}
2508
2509
int bgzf_index_load(BGZF *fp, const char *bname, const char *suffix)
2510
0
{
2511
0
    const char *name = bname, *msg = NULL;
2512
0
    char *tmp = NULL;
2513
0
    hFILE *idx = NULL;
2514
0
    if ( suffix )
2515
0
    {
2516
0
        tmp = get_name_suffix(bname, suffix);
2517
0
        if ( !tmp ) return -1;
2518
0
        name = tmp;
2519
0
    }
2520
2521
0
    idx = hopen(name, "rb");
2522
0
    if ( !idx ) {
2523
0
        msg = "Error opening";
2524
0
        goto fail;
2525
0
    }
2526
2527
0
    if (bgzf_index_load_hfile(fp, idx, name) != 0) goto fail;
2528
2529
0
    if (hclose(idx) != 0) {
2530
0
        idx = NULL;
2531
0
        msg = "Error closing";
2532
0
        goto fail;
2533
0
    }
2534
2535
0
    free(tmp);
2536
0
    return 0;
2537
2538
0
 fail:
2539
0
    if (msg != NULL) {
2540
0
        hts_log_error("%s %s : %s", msg, name, strerror(errno));
2541
0
    }
2542
0
    if (idx) hclose_abruptly(idx);
2543
0
    free(tmp);
2544
0
    return -1;
2545
0
}
2546
2547
int bgzf_useek(BGZF *fp, off_t uoffset, int where)
2548
0
{
2549
0
    if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
2550
0
        fp->errcode |= BGZF_ERR_MISUSE;
2551
0
        return -1;
2552
0
    }
2553
0
    if (uoffset >= fp->uncompressed_address - fp->block_offset &&
2554
0
        uoffset < fp->uncompressed_address + fp->block_length - fp->block_offset) {
2555
        // Can seek into existing data
2556
0
        fp->block_offset += uoffset - fp->uncompressed_address;
2557
0
        fp->uncompressed_address = uoffset;
2558
0
        return 0;
2559
0
    }
2560
0
    if ( !fp->is_compressed )
2561
0
    {
2562
0
        if (hseek(fp->fp, uoffset, SEEK_SET) < 0)
2563
0
        {
2564
0
            fp->errcode |= BGZF_ERR_IO;
2565
0
            return -1;
2566
0
        }
2567
0
        fp->block_length = 0;  // indicates current block has not been loaded
2568
0
        fp->block_address = uoffset;
2569
0
        fp->block_offset = 0;
2570
0
        if (bgzf_read_block(fp) < 0) {
2571
0
            fp->errcode |= BGZF_ERR_IO;
2572
0
            return -1;
2573
0
        }
2574
0
        fp->uncompressed_address = uoffset;
2575
0
        return 0;
2576
0
    }
2577
2578
0
    if ( !fp->idx )
2579
0
    {
2580
0
        fp->errcode |= BGZF_ERR_IO;
2581
0
        return -1;
2582
0
    }
2583
2584
    // binary search
2585
0
    int ilo = 0, ihi = fp->idx->noffs - 1;
2586
0
    while ( ilo<=ihi )
2587
0
    {
2588
0
        int i = (ilo+ihi)*0.5;
2589
0
        if ( uoffset < fp->idx->offs[i].uaddr ) ihi = i - 1;
2590
0
        else if ( uoffset >= fp->idx->offs[i].uaddr ) ilo = i + 1;
2591
0
        else break;
2592
0
    }
2593
0
    int i = ilo-1;
2594
0
    off_t offset = 0;
2595
0
    if (bgzf_seek_common(fp, fp->idx->offs[i].caddr, 0) < 0)
2596
0
        return -1;
2597
2598
0
    if ( bgzf_read_block(fp) < 0 ) {
2599
0
        fp->errcode |= BGZF_ERR_IO;
2600
0
        return -1;
2601
0
    }
2602
0
    offset = uoffset - fp->idx->offs[i].uaddr;
2603
0
    if ( offset > 0 )
2604
0
    {
2605
0
        if (offset > fp->block_length) {
2606
0
            fp->errcode |= BGZF_ERR_IO;
2607
0
            return -1;                                      //offset outside the available data
2608
0
        }
2609
0
        fp->block_offset = offset;
2610
0
        assert( fp->block_offset <= fp->block_length );     // todo: skipped, unindexed, blocks
2611
0
    }
2612
0
    fp->uncompressed_address = uoffset;
2613
0
    return 0;
2614
0
}
2615
2616
off_t bgzf_utell(BGZF *fp)
2617
0
{
2618
0
    return fp->uncompressed_address;    // currently maintained only when reading
2619
0
}
2620
2621
/* prototype is in hfile_internal.h */
2622
0
struct hFILE *bgzf_hfile(struct BGZF *fp) {
2623
0
    return fp->fp;
2624
0
}