Coverage Report

Created: 2025-07-12 06:16

/src/htslib/bgzf.c
Line
Count
Source (jump to first uncovered line)
1
/* The MIT License
2
3
   Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4
                 2011, 2012 Attractive Chaos <attractor@live.co.uk>
5
   Copyright (C) 2009, 2013-2025 Genome Research Ltd
6
7
   Permission is hereby granted, free of charge, to any person obtaining a copy
8
   of this software and associated documentation files (the "Software"), to deal
9
   in the Software without restriction, including without limitation the rights
10
   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11
   copies of the Software, and to permit persons to whom the Software is
12
   furnished to do so, subject to the following conditions:
13
14
   The above copyright notice and this permission notice shall be included in
15
   all copies or substantial portions of the Software.
16
17
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19
   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20
   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21
   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22
   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23
   THE SOFTWARE.
24
*/
25
26
#define HTS_BUILDING_LIBRARY // Enables HTSLIB_EXPORT, see htslib/hts_defs.h
27
#include <config.h>
28
29
#include <stdio.h>
30
#include <stdlib.h>
31
#include <string.h>
32
#include <errno.h>
33
#include <unistd.h>
34
#include <assert.h>
35
#include <pthread.h>
36
#include <sys/types.h>
37
#include <inttypes.h>
38
#include <zlib.h>
39
40
#ifdef HAVE_LIBDEFLATE
41
#include <libdeflate.h>
42
#endif
43
44
#include "htslib/hts.h"
45
#include "htslib/bgzf.h"
46
#include "htslib/hfile.h"
47
#include "htslib/thread_pool.h"
48
#include "htslib/hts_endian.h"
49
#include "cram/pooled_alloc.h"
50
#include "hts_internal.h"
51
52
#ifndef EFTYPE
53
84
#define EFTYPE ENOEXEC
54
#endif
55
56
#define BGZF_CACHE
57
#define BGZF_MT
58
59
177k
#define BLOCK_HEADER_LENGTH 18
60
88.6k
#define BLOCK_FOOTER_LENGTH 8
61
62
63
/* BGZF/GZIP header (specialized from RFC 1952; little endian):
64
 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
65
 | 31|139|  8|  4|              0|  0|255|      6| 66| 67|      2|BLK_LEN|
66
 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
67
  BGZF extension:
68
                ^                              ^   ^   ^
69
                |                              |   |   |
70
               FLG.EXTRA                     XLEN  B   C
71
72
  BGZF format is compatible with GZIP. It limits the size of each compressed
73
  block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
74
  records the size.
75
76
*/
77
static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
78
79
#ifdef BGZF_CACHE
80
typedef struct {
81
    int size;
82
    uint8_t *block;
83
    int64_t end_offset;
84
} cache_t;
85
86
#include "htslib/khash.h"
87
KHASH_MAP_INIT_INT64(cache, cache_t)
88
#endif
89
90
struct bgzf_cache_t {
91
    khash_t(cache) *h;
92
    khint_t last_pos;
93
};
94
95
#ifdef BGZF_MT
96
97
typedef struct bgzf_job {
98
    BGZF *fp;
99
    unsigned char comp_data[BGZF_MAX_BLOCK_SIZE];
100
    size_t comp_len;
101
    unsigned char uncomp_data[BGZF_MAX_BLOCK_SIZE];
102
    size_t uncomp_len;
103
    int errcode;
104
    int64_t block_address;
105
    int hit_eof;
106
} bgzf_job;
107
108
enum mtaux_cmd {
109
    NONE = 0,
110
    SEEK,
111
    SEEK_DONE,
112
    HAS_EOF,
113
    HAS_EOF_DONE,
114
    CLOSE,
115
};
116
117
// When multi-threaded bgzf_tell won't work, so we delay the hts_idx_push
118
// until we've written the last block.
119
typedef struct {
120
    hts_pos_t beg, end;
121
    int tid, is_mapped;  // args for hts_idx_push
122
    uint64_t offset, block_number;
123
} hts_idx_cache_entry;
124
125
typedef struct {
126
    int nentries, mentries; // used and allocated
127
    hts_idx_cache_entry *e; // hts_idx elements
128
} hts_idx_cache_t;
129
130
typedef struct bgzf_mtaux_t {
131
    // Memory pool for bgzf_job structs, to avoid many malloc/free
132
    pool_alloc_t *job_pool;
133
    bgzf_job *curr_job;
134
135
    // Thread pool
136
    int n_threads;
137
    int own_pool;
138
    hts_tpool *pool;
139
140
    // Output queue holding completed bgzf_jobs
141
    hts_tpool_process *out_queue;
142
143
    // I/O thread.
144
    pthread_t io_task;
145
    pthread_mutex_t job_pool_m;
146
    int jobs_pending; // number of jobs waiting
147
    int flush_pending;
148
    void *free_block;
149
    int hit_eof;  // r/w entirely within main thread
150
151
    // Message passing to the reader thread; eg seek requests
152
    int errcode;
153
    uint64_t block_address;
154
    int eof;
155
    pthread_mutex_t command_m; // Set whenever fp is being updated
156
    pthread_cond_t command_c;
157
    enum mtaux_cmd command;
158
159
    // For multi-threaded on-the-fly indexing. See bgzf_idx_push below.
160
    pthread_mutex_t idx_m;
161
    hts_idx_t *hts_idx;
162
    uint64_t block_number, block_written;
163
    hts_idx_cache_t idx_cache;
164
} mtaux_t;
165
#endif
166
167
typedef struct
168
{
169
    uint64_t uaddr;  // offset w.r.t. uncompressed data
170
    uint64_t caddr;  // offset w.r.t. compressed data
171
}
172
bgzidx1_t;
173
174
struct bgzidx_t
175
{
176
    int noffs, moffs;       // the size of the index, n:used, m:allocated
177
    bgzidx1_t *offs;        // offsets
178
    uint64_t ublock_addr;   // offset of the current block (uncompressed data)
179
};
180
181
/*
182
 * Buffers up arguments to hts_idx_push for later use, once we've written all bar
183
 * this block.  This is necessary when multiple blocks are in flight (threading)
184
 * and fp->block_address isn't known at the time of call as we have in-flight
185
 * blocks that haven't yet been compressed.
186
 *
187
 * NB: this only matters when we're indexing on the fly (writing).
188
 * Normal indexing is threaded reads, but we already know block sizes
189
 * so it's a simpler process
190
 *
191
 * Returns 0 on success,
192
 *        -1 on failure
193
 */
194
0
int bgzf_idx_push(BGZF *fp, hts_idx_t *hidx, int tid, hts_pos_t beg, hts_pos_t end, uint64_t offset, int is_mapped) {
195
0
    hts_idx_cache_entry *e;
196
0
    mtaux_t *mt = fp->mt;
197
198
0
    if (!mt)
199
0
        return hts_idx_push(hidx, tid, beg, end, offset, is_mapped);
200
201
    // Early check for out of range positions which would fail in hts_idx_push()
202
0
    if (hts_idx_check_range(hidx, tid, beg, end) < 0)
203
0
        return -1;
204
205
0
    pthread_mutex_lock(&mt->idx_m);
206
207
0
    mt->hts_idx = hidx;
208
0
    hts_idx_cache_t *ic = &mt->idx_cache;
209
210
0
    if (ic->nentries >= ic->mentries) {
211
0
        int new_sz = ic->mentries ? ic->mentries*2 : 1024;
212
0
        if (!(e = realloc(ic->e, new_sz * sizeof(*ic->e)))) {
213
0
            pthread_mutex_unlock(&mt->idx_m);
214
0
            return -1;
215
0
        }
216
0
        ic->e = e;
217
0
        ic->mentries = new_sz;
218
0
    }
219
220
0
    e = &ic->e[ic->nentries++];
221
0
    e->tid = tid;
222
0
    e->beg = beg;
223
0
    e->end = end;
224
0
    e->is_mapped = is_mapped;
225
0
    e->offset = offset & 0xffff;
226
0
    e->block_number = mt->block_number;
227
228
0
    pthread_mutex_unlock(&mt->idx_m);
229
230
0
    return 0;
231
0
}
232
233
static int bgzf_idx_flush(BGZF *fp,
234
0
                          size_t block_uncomp_len, size_t block_comp_len) {
235
0
    mtaux_t *mt = fp->mt;
236
237
0
    if (!mt->idx_cache.e) {
238
0
        mt->block_written++;
239
0
        return 0;
240
0
    }
241
242
0
    pthread_mutex_lock(&mt->idx_m);
243
244
0
    hts_idx_cache_entry *e = mt->idx_cache.e;
245
0
    int i;
246
247
0
    assert(mt->idx_cache.nentries == 0 || mt->block_written <= e[0].block_number);
248
249
0
    for (i = 0; i < mt->idx_cache.nentries && e[i].block_number == mt->block_written; i++) {
250
0
        if (block_uncomp_len > 0 && e[i].offset == block_uncomp_len) {
251
            /*
252
             * If the virtual offset is at the end of the current block,
253
             * adjust it to point to the start of the next one.  This
254
             * is needed when on-the-fly indexing has recorded a virtual
255
             * offset just before a new block has been started, and makes
256
             * on-the-fly and standard indexing give exactly the same results.
257
             *
258
             * In theory the two virtual offsets are equivalent, but pointing
259
             * to the end of a block is inefficient, and caused problems with
260
             * versions of HTSlib before 1.11 where bgzf_read() would
261
             * incorrectly return EOF.
262
             */
263
264
            // Assert that this is the last entry for the current block_number
265
0
            assert(i == mt->idx_cache.nentries - 1
266
0
                   || e[i].block_number < e[i + 1].block_number);
267
268
            // Work out where the next block starts.  For this entry, the
269
            // offset will be zero.
270
0
            uint64_t next_block_addr = mt->block_address + block_comp_len;
271
0
            if (hts_idx_push(mt->hts_idx, e[i].tid, e[i].beg, e[i].end,
272
0
                             next_block_addr << 16, e[i].is_mapped) < 0) {
273
0
                pthread_mutex_unlock(&mt->idx_m);
274
0
                return -1;
275
0
            }
276
            // Count this entry and drop out of the loop
277
0
            i++;
278
0
            break;
279
0
        }
280
281
0
        if (hts_idx_push(mt->hts_idx, e[i].tid, e[i].beg, e[i].end,
282
0
                         (mt->block_address << 16) + e[i].offset,
283
0
                         e[i].is_mapped) < 0) {
284
0
            pthread_mutex_unlock(&mt->idx_m);
285
0
            return -1;
286
0
        }
287
0
    }
288
289
0
    memmove(&e[0], &e[i], (mt->idx_cache.nentries - i) * sizeof(*e));
290
0
    mt->idx_cache.nentries -= i;
291
0
    mt->block_written++;
292
293
0
    pthread_mutex_unlock(&mt->idx_m);
294
0
    return 0;
295
0
}
296
297
void bgzf_index_destroy(BGZF *fp);
298
int bgzf_index_add_block(BGZF *fp);
299
static int mt_destroy(mtaux_t *mt);
300
301
static inline void packInt16(uint8_t *buffer, uint16_t value)
302
44.3k
{
303
44.3k
    buffer[0] = value;
304
44.3k
    buffer[1] = value >> 8;
305
44.3k
}
306
307
static inline int unpackInt16(const uint8_t *buffer)
308
1.04k
{
309
1.04k
    return buffer[0] | buffer[1] << 8;
310
1.04k
}
311
312
static inline void packInt32(uint8_t *buffer, uint32_t value)
313
88.6k
{
314
88.6k
    buffer[0] = value;
315
88.6k
    buffer[1] = value >> 8;
316
88.6k
    buffer[2] = value >> 16;
317
88.6k
    buffer[3] = value >> 24;
318
88.6k
}
319
320
static void razf_info(hFILE *hfp, const char *filename)
321
84
{
322
84
    uint64_t usize, csize;
323
84
    off_t sizes_pos;
324
325
84
    if (filename == NULL || strcmp(filename, "-") == 0) filename = "FILE";
326
327
    // RAZF files end with USIZE,CSIZE stored as big-endian uint64_t
328
84
    if ((sizes_pos = hseek(hfp, -16, SEEK_END)) < 0) goto no_sizes;
329
84
    if (hread(hfp, &usize, 8) != 8 || hread(hfp, &csize, 8) != 8) goto no_sizes;
330
84
    if (!ed_is_big()) ed_swap_8p(&usize), ed_swap_8p(&csize);
331
84
    if (csize >= sizes_pos) goto no_sizes; // Very basic validity check
332
333
1
    hts_log_error(
334
1
"To decompress this file, use the following commands:\n"
335
1
"    truncate -s %" PRIu64 " %s\n"
336
1
"    gunzip %s\n"
337
1
"The resulting uncompressed file should be %" PRIu64 " bytes in length.\n"
338
1
"If you do not have a truncate command, skip that step (though gunzip will\n"
339
1
"likely produce a \"trailing garbage ignored\" message, which can be ignored).",
340
1
                  csize, filename, filename, usize);
341
1
    return;
342
343
83
no_sizes:
344
83
    hts_log_error(
345
83
"To decompress this file, use the following command:\n"
346
83
"    gunzip %s\n"
347
83
"This will likely produce a \"trailing garbage ignored\" message, which can\n"
348
83
"usually be safely ignored.", filename);
349
83
}
350
351
static const char *bgzf_zerr(int errnum, z_stream *zs)
352
158
{
353
158
    static char buffer[32];
354
355
    /* Return zs->msg if available.
356
       zlib doesn't set this very reliably.  Looking at the source suggests
357
       that it may get set to a useful message for deflateInit2, inflateInit2
358
       and inflate when it returns Z_DATA_ERROR. For inflate with other
359
       return codes, deflate, deflateEnd and inflateEnd it doesn't appear
360
       to be useful.  For the likely non-useful cases, the caller should
361
       pass NULL into zs. */
362
363
158
    if (zs && zs->msg) return zs->msg;
364
365
    // gzerror OF((gzFile file, int *errnum)
366
6
    switch (errnum) {
367
6
    case Z_ERRNO:
368
6
        return strerror(errno);
369
0
    case Z_STREAM_ERROR:
370
0
        return "invalid parameter/compression level, or inconsistent stream state";
371
0
    case Z_DATA_ERROR:
372
0
        return "invalid or incomplete IO";
373
0
    case Z_MEM_ERROR:
374
0
        return "out of memory";
375
0
    case Z_BUF_ERROR:
376
0
        return "progress temporarily not possible, or in() / out() returned an error";
377
0
    case Z_VERSION_ERROR:
378
0
        return "zlib version mismatch";
379
0
    case Z_NEED_DICT:
380
0
        return "data was compressed using a dictionary";
381
0
    case Z_OK: // 0: maybe gzgets error Z_NULL
382
0
    default:
383
0
        snprintf(buffer, sizeof(buffer), "[%d] unknown", errnum);
384
0
        return buffer;  // FIXME: Not thread-safe.
385
6
    }
386
6
}
387
388
static BGZF *bgzf_read_init(hFILE *hfpr, const char *filename)
389
6.93k
{
390
6.93k
    BGZF *fp;
391
6.93k
    uint8_t magic[18];
392
6.93k
    ssize_t n = hpeek(hfpr, magic, 18);
393
6.93k
    if (n < 0) return NULL;
394
395
6.92k
    fp = (BGZF*)calloc(1, sizeof(BGZF));
396
6.92k
    if (fp == NULL) return NULL;
397
398
6.92k
    fp->is_write = 0;
399
6.92k
    fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
400
6.92k
    if (fp->uncompressed_block == NULL) { free(fp); return NULL; }
401
6.92k
    fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
402
6.92k
    fp->is_compressed = (n==18 && magic[0]==0x1f && magic[1]==0x8b);
403
6.92k
    fp->is_gzip = ( !fp->is_compressed || ((magic[3]&4) && memcmp(&magic[12], "BC\2\0",4)==0) ) ? 0 : 1;
404
6.92k
    if (fp->is_compressed && (magic[3]&4) && memcmp(&magic[12], "RAZF", 4)==0) {
405
84
        hts_log_error("Cannot decompress legacy RAZF format");
406
84
        razf_info(hfpr, filename);
407
84
        free(fp->uncompressed_block);
408
84
        free(fp);
409
84
        errno = EFTYPE;
410
84
        return NULL;
411
84
    }
412
6.84k
#ifdef BGZF_CACHE
413
6.84k
    if (!(fp->cache = malloc(sizeof(*fp->cache)))) {
414
0
        free(fp->uncompressed_block);
415
0
        free(fp);
416
0
        return NULL;
417
0
    }
418
6.84k
    if (!(fp->cache->h = kh_init(cache))) {
419
0
        free(fp->uncompressed_block);
420
0
        free(fp->cache);
421
0
        free(fp);
422
0
        return NULL;
423
0
    }
424
6.84k
    fp->cache->last_pos = 0;
425
6.84k
#endif
426
6.84k
    return fp;
427
6.84k
}
428
429
// get the compress level from the mode string: compress_level==-1 for the default level, -2 plain uncompressed
430
static int mode2level(const char *mode)
431
11.7k
{
432
11.7k
    int i, compress_level = -1;
433
35.2k
    for (i = 0; mode[i]; ++i)
434
23.5k
        if (mode[i] >= '0' && mode[i] <= '9') break;
435
11.7k
    if (mode[i]) compress_level = (int)mode[i] - '0';
436
11.7k
    if (strchr(mode, 'u')) compress_level = -2;
437
11.7k
    return compress_level;
438
11.7k
}
439
static BGZF *bgzf_write_init(const char *mode)
440
11.7k
{
441
11.7k
    BGZF *fp;
442
11.7k
    fp = (BGZF*)calloc(1, sizeof(BGZF));
443
11.7k
    if (fp == NULL) goto mem_fail;
444
11.7k
    fp->is_write = 1;
445
11.7k
    int compress_level = mode2level(mode);
446
11.7k
    if ( compress_level==-2 )
447
0
    {
448
0
        fp->is_compressed = 0;
449
0
        return fp;
450
0
    }
451
11.7k
    fp->is_compressed = 1;
452
453
11.7k
    fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
454
11.7k
    if (fp->uncompressed_block == NULL) goto mem_fail;
455
11.7k
    fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
456
457
11.7k
    fp->compress_level = compress_level < 0? Z_DEFAULT_COMPRESSION : compress_level; // Z_DEFAULT_COMPRESSION==-1
458
11.7k
    if (fp->compress_level > 9) fp->compress_level = Z_DEFAULT_COMPRESSION;
459
11.7k
    if ( strchr(mode,'g') )
460
0
    {
461
        // gzip output
462
0
        fp->is_gzip = 1;
463
0
        fp->gz_stream = (z_stream*)calloc(1,sizeof(z_stream));
464
0
        if (fp->gz_stream == NULL) goto mem_fail;
465
0
        fp->gz_stream->zalloc = NULL;
466
0
        fp->gz_stream->zfree  = NULL;
467
0
        fp->gz_stream->msg    = NULL;
468
469
0
        int ret = deflateInit2(fp->gz_stream, fp->compress_level, Z_DEFLATED, 15|16, 8, Z_DEFAULT_STRATEGY);
470
0
        if (ret!=Z_OK) {
471
0
            hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret, fp->gz_stream));
472
0
            goto fail;
473
0
        }
474
0
    }
475
11.7k
    return fp;
476
477
0
mem_fail:
478
0
    hts_log_error("%s", strerror(errno));
479
480
0
fail:
481
0
    if (fp != NULL) {
482
0
        free(fp->uncompressed_block);
483
0
        free(fp->gz_stream);
484
0
        free(fp);
485
0
    }
486
0
    return NULL;
487
0
}
488
489
BGZF *bgzf_open(const char *path, const char *mode)
490
1.01k
{
491
1.01k
    BGZF *fp = 0;
492
1.01k
    if (strchr(mode, 'r')) {
493
1.01k
        hFILE *fpr;
494
1.01k
        if ((fpr = hopen(path, mode)) == 0) return 0;
495
442
        fp = bgzf_read_init(fpr, path);
496
442
        if (fp == 0) { hclose_abruptly(fpr); return NULL; }
497
424
        fp->fp = fpr;
498
424
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
499
0
        hFILE *fpw;
500
0
        if ((fpw = hopen(path, mode)) == 0) return 0;
501
0
        fp = bgzf_write_init(mode);
502
0
        if (fp == NULL) return NULL;
503
0
        fp->fp = fpw;
504
0
    }
505
0
    else { errno = EINVAL; return 0; }
506
507
424
    fp->is_be = ed_is_big();
508
424
    return fp;
509
1.01k
}
510
511
BGZF *bgzf_dopen(int fd, const char *mode)
512
0
{
513
0
    BGZF *fp = 0;
514
0
    if (strchr(mode, 'r')) {
515
0
        hFILE *fpr;
516
0
        if ((fpr = hdopen(fd, mode)) == 0) return 0;
517
0
        fp = bgzf_read_init(fpr, NULL);
518
0
        if (fp == 0) { hclose_abruptly(fpr); return NULL; } // FIXME this closes fd
519
0
        fp->fp = fpr;
520
0
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
521
0
        hFILE *fpw;
522
0
        if ((fpw = hdopen(fd, mode)) == 0) return 0;
523
0
        fp = bgzf_write_init(mode);
524
0
        if (fp == NULL) return NULL;
525
0
        fp->fp = fpw;
526
0
    }
527
0
    else { errno = EINVAL; return 0; }
528
529
0
    fp->is_be = ed_is_big();
530
0
    return fp;
531
0
}
532
533
BGZF *bgzf_hopen(hFILE *hfp, const char *mode)
534
18.2k
{
535
18.2k
    BGZF *fp = NULL;
536
18.2k
    if (strchr(mode, 'r')) {
537
6.49k
        fp = bgzf_read_init(hfp, NULL);
538
6.49k
        if (fp == NULL) return NULL;
539
11.7k
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
540
11.7k
        fp = bgzf_write_init(mode);
541
11.7k
        if (fp == NULL) return NULL;
542
11.7k
    }
543
0
    else { errno = EINVAL; return 0; }
544
545
18.1k
    fp->fp = hfp;
546
18.1k
    fp->is_be = ed_is_big();
547
18.1k
    return fp;
548
18.2k
}
549
550
#ifdef HAVE_LIBDEFLATE
551
uint32_t hts_crc32(uint32_t crc, const void *buf, size_t len) {
552
    return libdeflate_crc32(crc, buf, len);
553
}
554
555
int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
556
{
557
    if (slen == 0) {
558
        // EOF block
559
        if (*dlen < 28) return -1;
560
        memcpy(_dst, "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", 28);
561
        *dlen = 28;
562
        return 0;
563
    }
564
565
    uint8_t *dst = (uint8_t*)_dst;
566
567
    if (level == 0) {
568
        // Uncompressed data
569
        if (*dlen < slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH) return -1;
570
        dst[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
571
        u16_to_le(slen,  &dst[BLOCK_HEADER_LENGTH+1]); // length
572
        u16_to_le(~slen, &dst[BLOCK_HEADER_LENGTH+3]); // ones-complement length
573
        memcpy(dst + BLOCK_HEADER_LENGTH+5, src, slen);
574
        *dlen = slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
575
576
    } else {
577
        level = level > 0 ? level : 6; // libdeflate doesn't honour -1 as default
578
        // NB levels go up to 12 here.
579
        int lvl_map[] = {0,1,2,3,5,6,7,8,10,12};
580
        level = lvl_map[level>9 ?9 :level];
581
        struct libdeflate_compressor *z = libdeflate_alloc_compressor(level);
582
        if (!z) return -1;
583
584
        // Raw deflate
585
        size_t clen =
586
            libdeflate_deflate_compress(z, src, slen,
587
                                        dst + BLOCK_HEADER_LENGTH,
588
                                        *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH);
589
590
        if (clen <= 0) {
591
            hts_log_error("Call to libdeflate_deflate_compress failed");
592
            libdeflate_free_compressor(z);
593
            return -1;
594
        }
595
596
        *dlen = clen + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
597
598
        libdeflate_free_compressor(z);
599
    }
600
601
    // write the header
602
    memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
603
    packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
604
605
    // write the footer
606
    uint32_t crc = libdeflate_crc32(0, src, slen);
607
    packInt32((uint8_t*)&dst[*dlen - 8], crc);
608
    packInt32((uint8_t*)&dst[*dlen - 4], slen);
609
    return 0;
610
}
611
612
#else
613
614
0
uint32_t hts_crc32(uint32_t crc, const void *buf, size_t len) {
615
0
    return crc32(crc, buf, len);
616
0
}
617
618
int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
619
44.3k
{
620
44.3k
    uint32_t crc;
621
44.3k
    z_stream zs;
622
44.3k
    uint8_t *dst = (uint8_t*)_dst;
623
624
44.3k
    if (level == 0) {
625
0
    uncomp:
626
        // Uncompressed data
627
0
        if (*dlen < slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH) return -1;
628
0
        dst[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
629
0
        u16_to_le(slen,  &dst[BLOCK_HEADER_LENGTH+1]); // length
630
0
        u16_to_le(~slen, &dst[BLOCK_HEADER_LENGTH+3]); // ones-complement length
631
0
        memcpy(dst + BLOCK_HEADER_LENGTH+5, src, slen);
632
0
        *dlen = slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
633
44.3k
    } else {
634
        // compress the body
635
44.3k
        zs.zalloc = NULL; zs.zfree = NULL;
636
44.3k
        zs.msg = NULL;
637
44.3k
        zs.next_in  = (Bytef*)src;
638
44.3k
        zs.avail_in = slen;
639
44.3k
        zs.next_out = dst + BLOCK_HEADER_LENGTH;
640
44.3k
        zs.avail_out = *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
641
44.3k
        int ret = deflateInit2(&zs, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); // -15 to disable zlib header/footer
642
44.3k
        if (ret!=Z_OK) {
643
0
            hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret, &zs));
644
0
            return -1;
645
0
        }
646
44.3k
        if ((ret = deflate(&zs, Z_FINISH)) != Z_STREAM_END) {
647
0
            if (ret == Z_OK && zs.avail_out == 0) {
648
0
                deflateEnd(&zs);
649
0
                goto uncomp;
650
0
            } else {
651
0
                hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
652
0
            }
653
0
            return -1;
654
0
        }
655
        // If we used up the entire output buffer, then we either ran out of
656
        // room or we *just* fitted, but either way we may as well store
657
        // uncompressed for faster decode.
658
44.3k
        if (zs.avail_out == 0) {
659
0
            deflateEnd(&zs);
660
0
            goto uncomp;
661
0
        }
662
44.3k
        if ((ret = deflateEnd(&zs)) != Z_OK) {
663
0
            hts_log_error("Call to deflateEnd failed: %s", bgzf_zerr(ret, NULL));
664
0
            return -1;
665
0
        }
666
44.3k
        *dlen = zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
667
44.3k
    }
668
669
    // write the header
670
44.3k
    memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
671
44.3k
    packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
672
    // write the footer
673
44.3k
    crc = crc32(crc32(0L, NULL, 0L), (Bytef*)src, slen);
674
44.3k
    packInt32((uint8_t*)&dst[*dlen - 8], crc);
675
44.3k
    packInt32((uint8_t*)&dst[*dlen - 4], slen);
676
44.3k
    return 0;
677
44.3k
}
678
#endif // HAVE_LIBDEFLATE
679
680
static int bgzf_gzip_compress(BGZF *fp, void *_dst, size_t *dlen, const void *src, size_t slen, int level)
681
0
{
682
0
    uint8_t *dst = (uint8_t*)_dst;
683
0
    z_stream *zs = fp->gz_stream;
684
0
    int flush = slen ? Z_PARTIAL_FLUSH : Z_FINISH;
685
0
    zs->next_in   = (Bytef*)src;
686
0
    zs->avail_in  = slen;
687
0
    zs->next_out  = dst;
688
0
    zs->avail_out = *dlen;
689
0
    int ret = deflate(zs, flush);
690
0
    if (ret == Z_STREAM_ERROR) {
691
0
        hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret, NULL));
692
0
        return -1;
693
0
    }
694
0
    if (zs->avail_in != 0) {
695
0
        hts_log_error("Deflate block too large for output buffer");
696
0
        return -1;
697
0
    }
698
0
    *dlen = *dlen - zs->avail_out;
699
0
    return 0;
700
0
}
701
702
// Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
703
static int deflate_block(BGZF *fp, int block_length)
704
44.3k
{
705
44.3k
    size_t comp_size = BGZF_MAX_BLOCK_SIZE;
706
44.3k
    int ret;
707
44.3k
    if ( !fp->is_gzip )
708
44.3k
        ret = bgzf_compress(fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
709
0
    else
710
0
        ret = bgzf_gzip_compress(fp, fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
711
712
44.3k
    if ( ret != 0 )
713
0
    {
714
0
        hts_log_debug("Compression error %d", ret);
715
0
        fp->errcode |= BGZF_ERR_ZLIB;
716
0
        return -1;
717
0
    }
718
44.3k
    fp->block_offset = 0;
719
44.3k
    return comp_size;
720
44.3k
}
721
722
#ifdef HAVE_LIBDEFLATE
723
724
static int bgzf_uncompress(uint8_t *dst, size_t *dlen,
725
                           const uint8_t *src, size_t slen,
726
                           uint32_t expected_crc) {
727
    struct libdeflate_decompressor *z = libdeflate_alloc_decompressor();
728
    if (!z) {
729
        hts_log_error("Call to libdeflate_alloc_decompressor failed");
730
        return -1;
731
    }
732
733
    int ret = libdeflate_deflate_decompress(z, src, slen, dst, *dlen, dlen);
734
    libdeflate_free_decompressor(z);
735
736
    if (ret != LIBDEFLATE_SUCCESS) {
737
        hts_log_error("Inflate operation failed: %d", ret);
738
        return -1;
739
    }
740
741
    uint32_t crc = libdeflate_crc32(0, (unsigned char *)dst, *dlen);
742
#ifdef FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION
743
    // Pretend the CRC was OK so the fuzzer doesn't have to get it right
744
    crc = expected_crc;
745
#endif
746
    if (crc != expected_crc) {
747
        hts_log_error("CRC32 checksum mismatch");
748
        return -2;
749
    }
750
751
    return 0;
752
}
753
754
#else
755
756
static int bgzf_uncompress(uint8_t *dst, size_t *dlen,
757
                           const uint8_t *src, size_t slen,
758
99
                           uint32_t expected_crc) {
759
99
    z_stream zs = {
760
99
        .zalloc = NULL,
761
99
        .zfree = NULL,
762
99
        .msg = NULL,
763
99
        .next_in = (Bytef*)src,
764
99
        .avail_in = slen,
765
99
        .next_out = (Bytef*)dst,
766
99
        .avail_out = *dlen
767
99
    };
768
769
99
    int ret = inflateInit2(&zs, -15);
770
99
    if (ret != Z_OK) {
771
0
        hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret, &zs));
772
0
        return -1;
773
0
    }
774
99
    if ((ret = inflate(&zs, Z_FINISH)) != Z_STREAM_END) {
775
6
        hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
776
6
        if ((ret = inflateEnd(&zs)) != Z_OK) {
777
0
            hts_log_warning("Call to inflateEnd failed: %s", bgzf_zerr(ret, NULL));
778
0
        }
779
6
        return -1;
780
6
    }
781
93
    if ((ret = inflateEnd(&zs)) != Z_OK) {
782
0
        hts_log_error("Call to inflateEnd failed: %s", bgzf_zerr(ret, NULL));
783
0
        return -1;
784
0
    }
785
93
    *dlen = *dlen - zs.avail_out;
786
787
93
    uint32_t crc = crc32(crc32(0L, NULL, 0L), (unsigned char *)dst, *dlen);
788
93
#ifdef FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION
789
    // Pretend the CRC was OK so the fuzzer doesn't have to get it right
790
93
    crc = expected_crc;
791
93
#endif
792
93
    if (crc != expected_crc) {
793
0
        hts_log_error("CRC32 checksum mismatch");
794
0
        return -2;
795
0
    }
796
797
93
    return 0;
798
93
}
799
#endif // HAVE_LIBDEFLATE
800
801
// Inflate the block in fp->compressed_block into fp->uncompressed_block
802
static int inflate_block(BGZF* fp, int block_length)
803
99
{
804
99
    size_t dlen = BGZF_MAX_BLOCK_SIZE;
805
99
    uint32_t crc = le_to_u32((uint8_t *)fp->compressed_block + block_length-8);
806
99
    int ret = bgzf_uncompress(fp->uncompressed_block, &dlen,
807
99
                              (Bytef*)fp->compressed_block + 18,
808
99
                              block_length - 18, crc);
809
99
    if (ret < 0) {
810
6
        if (ret == -2)
811
0
            fp->errcode |= BGZF_ERR_CRC;
812
6
        else
813
6
            fp->errcode |= BGZF_ERR_ZLIB;
814
6
        return -1;
815
6
    }
816
817
93
    return dlen;
818
99
}
819
820
// Decompress the next part of a non-blocked GZIP file.
821
// Return the number of uncompressed bytes read, 0 on EOF, or a negative number on error.
822
// Will fill the output buffer unless the end of the GZIP file is reached.
823
static int inflate_gzip_block(BGZF *fp)
824
48.5k
{
825
    // we will set this to true when we detect EOF, so we don't bang against the EOF more than once per call
826
48.5k
    int input_eof = 0;
827
828
    // write to the part of the output buffer after block_offset
829
48.5k
    fp->gz_stream->next_out = (Bytef*)fp->uncompressed_block + fp->block_offset;
830
48.5k
    fp->gz_stream->avail_out = BGZF_MAX_BLOCK_SIZE - fp->block_offset;
831
832
97.7k
    while ( fp->gz_stream->avail_out != 0 ) {
833
        // until we fill the output buffer (or hit EOF)
834
835
49.8k
        if ( !input_eof && fp->gz_stream->avail_in == 0 ) {
836
            // we are out of input data in the buffer. Get more.
837
1.22k
            fp->gz_stream->next_in = fp->compressed_block;
838
1.22k
            int ret = hread(fp->fp, fp->compressed_block, BGZF_BLOCK_SIZE);
839
1.22k
            if ( ret < 0 ) {
840
                // hread had an error. Pass it on.
841
0
                return ret;
842
0
            }
843
1.22k
            fp->gz_stream->avail_in = ret;
844
1.22k
            if ( fp->gz_stream->avail_in < BGZF_BLOCK_SIZE ) {
845
                // we have reached EOF but the decompressor hasn't necessarily
846
770
                input_eof = 1;
847
770
            }
848
1.22k
        }
849
850
49.8k
        fp->gz_stream->msg = NULL;
851
        // decompress as much data as we can
852
49.8k
        int ret = inflate(fp->gz_stream, Z_SYNC_FLUSH);
853
854
49.8k
        if ( (ret < 0 && ret != Z_BUF_ERROR) || ret == Z_NEED_DICT ) {
855
            // an error occurred, other than running out of space
856
146
            hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? fp->gz_stream : NULL));
857
146
            fp->errcode |= BGZF_ERR_ZLIB;
858
146
            return -1;
859
49.6k
        } else if ( ret == Z_STREAM_END ) {
860
            // we finished a GZIP member
861
862
            // scratch for peeking to see if the file is over
863
14
            char c;
864
14
            if (fp->gz_stream->avail_in > 0 || hpeek(fp->fp, &c, 1) == 1) {
865
                // there is more data; try and read another GZIP member in the remaining data
866
14
                int reset_ret = inflateReset(fp->gz_stream);
867
14
                if (reset_ret != Z_OK) {
868
0
                    hts_log_error("Call to inflateReset failed: %s", bgzf_zerr(reset_ret, NULL));
869
0
                    fp->errcode |= BGZF_ERR_ZLIB;
870
0
                    return -1;
871
0
                }
872
14
            } else {
873
                // we consumed all the input data and hit Z_STREAM_END
874
                // so stop looping, even if we never fill the output buffer
875
0
                break;
876
0
            }
877
49.6k
        } else if ( ret == Z_BUF_ERROR && input_eof && fp->gz_stream->avail_out > 0 ) {
878
            // the gzip file has ended prematurely
879
425
            hts_log_error("Gzip file truncated");
880
425
            fp->errcode |= BGZF_ERR_IO;
881
425
            return -1;
882
425
        }
883
49.8k
    }
884
885
    // when we get here, the buffer is full or there is an EOF after a complete gzip member
886
47.9k
    return BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
887
48.5k
}
888
889
// Returns: 0 on success (BGZF header); -1 on non-BGZF GZIP header; -2 on error
890
static int check_header(const uint8_t *header)
891
1.22k
{
892
1.22k
    if ( header[0] != 31 || header[1] != 139 || header[2] != 8 ) return -2;
893
1.20k
    return ((header[3] & 4) != 0
894
1.20k
            && unpackInt16((uint8_t*)&header[10]) == 6
895
1.20k
            && header[12] == 'B' && header[13] == 'C'
896
1.20k
            && unpackInt16((uint8_t*)&header[14]) == 2) ? 0 : -1;
897
1.22k
}
898
899
#ifdef BGZF_CACHE
900
static void free_cache(BGZF *fp)
901
18.5k
{
902
18.5k
    khint_t k;
903
18.5k
    if (fp->is_write) return;
904
6.84k
    khash_t(cache) *h = fp->cache->h;
905
6.84k
    for (k = kh_begin(h); k < kh_end(h); ++k)
906
0
        if (kh_exist(h, k)) free(kh_val(h, k).block);
907
6.84k
    kh_destroy(cache, h);
908
6.84k
    free(fp->cache);
909
6.84k
}
910
911
static int load_block_from_cache(BGZF *fp, int64_t block_address)
912
0
{
913
0
    khint_t k;
914
0
    cache_t *p;
915
916
0
    khash_t(cache) *h = fp->cache->h;
917
0
    k = kh_get(cache, h, block_address);
918
0
    if (k == kh_end(h)) return 0;
919
0
    p = &kh_val(h, k);
920
0
    if (fp->block_length != 0) fp->block_offset = 0;
921
0
    fp->block_address = block_address;
922
0
    fp->block_length = p->size;
923
0
    memcpy(fp->uncompressed_block, p->block, p->size);
924
0
    if ( hseek(fp->fp, p->end_offset, SEEK_SET) < 0 )
925
0
    {
926
        // todo: move the error up
927
0
        hts_log_error("Could not hseek to %" PRId64, p->end_offset);
928
0
        exit(1);
929
0
    }
930
0
    return p->size;
931
0
}
932
933
static void cache_block(BGZF *fp, int size)
934
87
{
935
87
    int ret;
936
87
    khint_t k, k_orig;
937
87
    uint8_t *block = NULL;
938
87
    cache_t *p;
939
    //fprintf(stderr, "Cache block at %llx\n", (int)fp->block_address);
940
87
    khash_t(cache) *h = fp->cache->h;
941
87
    if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return;
942
0
    if (fp->block_length < 0 || fp->block_length > BGZF_MAX_BLOCK_SIZE) return;
943
0
    if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > (uint32_t)fp->cache_size) {
944
        /* Remove uniformly from any position in the hash by a simple
945
         * round-robin approach.  An alternative strategy would be to
946
         * remove the least recently accessed block, but the round-robin
947
         * removal is simpler and is not expected to have a big impact
948
         * on performance */
949
0
        if (fp->cache->last_pos >= kh_end(h)) fp->cache->last_pos = kh_begin(h);
950
0
        k_orig = k = fp->cache->last_pos;
951
0
        if (++k >= kh_end(h)) k = kh_begin(h);
952
0
        while (k != k_orig) {
953
0
            if (kh_exist(h, k))
954
0
                break;
955
0
            if (++k == kh_end(h))
956
0
                k = kh_begin(h);
957
0
        }
958
0
        fp->cache->last_pos = k;
959
960
0
        if (k != k_orig) {
961
0
            block = kh_val(h, k).block;
962
0
            kh_del(cache, h, k);
963
0
        }
964
0
    } else {
965
0
        block = (uint8_t*)malloc(BGZF_MAX_BLOCK_SIZE);
966
0
    }
967
0
    if (!block) return;
968
0
    k = kh_put(cache, h, fp->block_address, &ret);
969
0
    if (ret <= 0) { // kh_put failed, or in there already (shouldn't happen)
970
0
        free(block);
971
0
        return;
972
0
    }
973
0
    p = &kh_val(h, k);
974
0
    p->size = fp->block_length;
975
0
    p->end_offset = fp->block_address + size;
976
0
    p->block = block;
977
0
    memcpy(p->block, fp->uncompressed_block, p->size);
978
0
}
979
#else
980
static void free_cache(BGZF *fp) {}
981
static int load_block_from_cache(BGZF *fp, int64_t block_address) {return 0;}
982
static void cache_block(BGZF *fp, int size) {}
983
#endif
984
985
/*
986
 * Absolute htell in this compressed file.
987
 *
988
 * Do not confuse with the external bgzf_tell macro which returns the virtual
989
 * offset.
990
 */
991
104k
static off_t bgzf_htell(BGZF *fp) {
992
104k
    if (fp->mt) {
993
0
        pthread_mutex_lock(&fp->mt->job_pool_m);
994
0
        off_t pos = fp->block_address + fp->block_clength;
995
0
        pthread_mutex_unlock(&fp->mt->job_pool_m);
996
0
        return pos;
997
104k
    } else {
998
104k
        return htell(fp->fp);
999
104k
    }
1000
104k
}
1001
1002
int bgzf_read_block(BGZF *fp)
1003
54.6k
{
1004
54.6k
    hts_tpool_result *r;
1005
1006
54.6k
    if (fp->errcode) return -1;
1007
1008
54.6k
    if (fp->mt) {
1009
0
    again:
1010
0
        if (fp->mt->hit_eof) {
1011
            // Further reading at EOF will always return 0
1012
0
            fp->block_length = 0;
1013
0
            return 0;
1014
0
        }
1015
0
        r = hts_tpool_next_result_wait(fp->mt->out_queue);
1016
0
        bgzf_job *j = r ? (bgzf_job *)hts_tpool_result_data(r) : NULL;
1017
1018
0
        if (!j || j->errcode == BGZF_ERR_MT) {
1019
0
            if (!fp->mt->free_block) {
1020
0
                fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
1021
0
                if (fp->uncompressed_block == NULL) return -1;
1022
0
                fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
1023
0
            } // else it's already allocated with malloc, maybe even in-use.
1024
0
            if (mt_destroy(fp->mt) < 0) {
1025
0
                fp->errcode = BGZF_ERR_IO;
1026
0
            }
1027
0
            fp->mt = NULL;
1028
0
            hts_tpool_delete_result(r, 0);
1029
0
            if (fp->errcode) {
1030
0
                return -1;
1031
0
            }
1032
0
            goto single_threaded;
1033
0
        }
1034
1035
0
        if (j->errcode) {
1036
0
            fp->errcode = j->errcode;
1037
0
            hts_log_error("BGZF decode jobs returned error %d "
1038
0
                          "for block offset %"PRId64,
1039
0
                          j->errcode, j->block_address);
1040
0
            hts_tpool_delete_result(r, 0);
1041
0
            return -1;
1042
0
        }
1043
1044
0
        if (j->hit_eof) {
1045
0
            if (!fp->last_block_eof && !fp->no_eof_block) {
1046
0
                fp->no_eof_block = 1;
1047
0
                hts_log_warning("EOF marker is absent. The input may be truncated");
1048
0
            }
1049
0
            fp->mt->hit_eof = 1;
1050
0
        }
1051
1052
        // Zero length blocks in the middle of a file are (wrongly)
1053
        // considered as EOF by many callers.  We work around this by
1054
        // trying again to see if we hit a genuine EOF.
1055
0
        if (!j->hit_eof && j->uncomp_len == 0) {
1056
0
            fp->last_block_eof = 1;
1057
0
            hts_tpool_delete_result(r, 0);
1058
0
            goto again;
1059
0
        }
1060
1061
        // block_length=0 and block_offset set by bgzf_seek.
1062
0
        if (fp->block_length != 0) fp->block_offset = 0;
1063
0
        if (!j->hit_eof) fp->block_address = j->block_address;
1064
0
        fp->block_clength = j->comp_len;
1065
0
        fp->block_length = j->uncomp_len;
1066
        // bgzf_read() can change fp->block_length
1067
0
        fp->last_block_eof = (fp->block_length == 0);
1068
1069
0
        if ( j->uncomp_len && j->fp->idx_build_otf )
1070
0
        {
1071
0
            bgzf_index_add_block(j->fp);
1072
0
            j->fp->idx->ublock_addr += j->uncomp_len;
1073
0
        }
1074
1075
        // Steal the data block as it's quicker than a memcpy.
1076
        // We just need to make sure we delay the pool free.
1077
0
        if (fp->mt->curr_job) {
1078
0
            pthread_mutex_lock(&fp->mt->job_pool_m);
1079
0
            pool_free(fp->mt->job_pool, fp->mt->curr_job);
1080
0
            pthread_mutex_unlock(&fp->mt->job_pool_m);
1081
0
        }
1082
0
        fp->uncompressed_block = j->uncomp_data;
1083
0
        fp->mt->curr_job = j;
1084
0
        if (fp->mt->free_block) {
1085
0
            free(fp->mt->free_block); // clear up last non-mt block
1086
0
            fp->mt->free_block = NULL;
1087
0
        }
1088
1089
0
        hts_tpool_delete_result(r, 0);
1090
0
        return 0;
1091
0
    }
1092
1093
54.6k
    uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
1094
54.6k
    int count, size, block_length, remaining;
1095
1096
54.6k
 single_threaded:
1097
54.6k
    size = 0;
1098
1099
54.6k
    int64_t block_address;
1100
54.6k
    block_address = bgzf_htell(fp);
1101
1102
    // Reading an uncompressed file
1103
54.6k
    if ( !fp->is_compressed )
1104
5.80k
    {
1105
5.80k
        count = hread(fp->fp, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
1106
5.80k
        if (count < 0)  // Error
1107
0
        {
1108
0
            hts_log_error("Failed to read uncompressed data "
1109
0
                          "at offset %"PRId64"%s%s",
1110
0
                          block_address, errno ? ": " : "", strerror(errno));
1111
0
            fp->errcode |= BGZF_ERR_IO;
1112
0
            return -1;
1113
0
        }
1114
5.80k
        else if (count == 0)  // EOF
1115
1.82k
        {
1116
1.82k
            fp->block_length = 0;
1117
1.82k
            return 0;
1118
1.82k
        }
1119
3.98k
        if (fp->block_length != 0) fp->block_offset = 0;
1120
3.98k
        fp->block_address = block_address;
1121
3.98k
        fp->block_length = count;
1122
3.98k
        return 0;
1123
5.80k
    }
1124
1125
    // Reading compressed file
1126
48.8k
    if ( fp->is_gzip && fp->gz_stream ) // is this is an initialized gzip stream?
1127
47.4k
    {
1128
47.4k
        count = inflate_gzip_block(fp);
1129
47.4k
        if ( count<0 )
1130
523
        {
1131
523
            hts_log_error("Reading GZIP stream failed at offset %"PRId64,
1132
523
                          block_address);
1133
523
            fp->errcode |= BGZF_ERR_ZLIB;
1134
523
            return -1;
1135
523
        }
1136
46.9k
        fp->block_length = count;
1137
46.9k
        fp->block_address = block_address;
1138
46.9k
        return 0;
1139
47.4k
    }
1140
1.34k
    if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
1141
1142
    // loop to skip empty bgzf blocks
1143
1.34k
    while (1)
1144
1.34k
    {
1145
1.34k
        count = hread(fp->fp, header, sizeof(header));
1146
1.34k
        if (count == 0) { // no data read
1147
102
            if (!fp->last_block_eof && !fp->no_eof_block && !fp->is_gzip) {
1148
39
                fp->no_eof_block = 1;
1149
39
                hts_log_warning("EOF marker is absent. The input may be truncated");
1150
39
            }
1151
102
            fp->block_length = 0;
1152
102
            return 0;
1153
102
        }
1154
1.24k
        int ret = 0;
1155
1.24k
        if ( count != sizeof(header) || (ret=check_header(header))==-2 )
1156
42
        {
1157
42
            fp->errcode |= BGZF_ERR_HEADER;
1158
42
            hts_log_error("%s BGZF header at offset %"PRId64,
1159
42
                          ret ? "Invalid" : "Failed to read",
1160
42
                          block_address);
1161
42
            return -1;
1162
42
        }
1163
1.20k
        if ( ret==-1 )
1164
1.07k
        {
1165
            // GZIP, not BGZF
1166
1.07k
            uint8_t *cblock = (uint8_t*)fp->compressed_block;
1167
1.07k
            memcpy(cblock, header, sizeof(header));
1168
1.07k
            count = hread(fp->fp, cblock+sizeof(header), BGZF_BLOCK_SIZE - sizeof(header)) + sizeof(header);
1169
1170
1.07k
            fp->is_gzip = 1;
1171
1.07k
            fp->gz_stream = (z_stream*) calloc(1,sizeof(z_stream));
1172
            // Set up zlib, using a window size of 15, and its built-in GZIP header processing (+16).
1173
1.07k
            int ret = inflateInit2(fp->gz_stream, 15 + 16);
1174
1.07k
            if (ret != Z_OK)
1175
0
            {
1176
0
                hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret, fp->gz_stream));
1177
0
                fp->errcode |= BGZF_ERR_ZLIB;
1178
0
                return -1;
1179
0
            }
1180
1.07k
            fp->gz_stream->avail_in = count;
1181
1.07k
            fp->gz_stream->next_in  = cblock;
1182
1.07k
            count = inflate_gzip_block(fp);
1183
1.07k
            if ( count<0 )
1184
48
            {
1185
48
                hts_log_error("Reading GZIP stream failed at offset %"PRId64,
1186
48
                              block_address);
1187
48
                fp->errcode |= BGZF_ERR_ZLIB;
1188
48
                return -1;
1189
48
            }
1190
1.02k
            fp->block_length = count;
1191
1.02k
            fp->block_address = block_address;
1192
1.02k
            if ( fp->idx_build_otf ) return -1; // cannot build index for gzip
1193
1.02k
            return 0;
1194
1.02k
        }
1195
125
        size = count;
1196
125
        block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1197
125
        if (block_length < BLOCK_HEADER_LENGTH)
1198
0
        {
1199
0
            hts_log_error("Invalid BGZF block length at offset %"PRId64,
1200
0
                          block_address);
1201
0
            fp->errcode |= BGZF_ERR_HEADER;
1202
0
            return -1;
1203
0
        }
1204
125
        compressed_block = (uint8_t*)fp->compressed_block;
1205
125
        memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1206
125
        remaining = block_length - BLOCK_HEADER_LENGTH;
1207
125
        count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1208
125
        if (count != remaining) {
1209
26
            hts_log_error("Failed to read BGZF block data at offset %"PRId64
1210
26
                          " expected %d bytes; hread returned %d",
1211
26
                          block_address, remaining, count);
1212
26
            fp->errcode |= BGZF_ERR_IO;
1213
26
            return -1;
1214
26
        }
1215
99
        size += count;
1216
99
        if ((count = inflate_block(fp, block_length)) < 0) {
1217
6
            hts_log_debug("Inflate block operation failed for "
1218
6
                          "block at offset %"PRId64": %s",
1219
6
                          block_address, bgzf_zerr(count, NULL));
1220
6
            fp->errcode |= BGZF_ERR_ZLIB;
1221
6
            return -1;
1222
6
        }
1223
93
        fp->last_block_eof = (count == 0);
1224
93
        if ( count ) break;     // otherwise an empty bgzf block
1225
6
        block_address = bgzf_htell(fp); // update for new block start
1226
6
    }
1227
87
    if (fp->block_length != 0) fp->block_offset = 0; // Do not reset offset if this read follows a seek.
1228
87
    fp->block_address = block_address;
1229
87
    fp->block_length = count;
1230
87
    if ( fp->idx_build_otf )
1231
0
    {
1232
0
        bgzf_index_add_block(fp);
1233
0
        fp->idx->ublock_addr += count;
1234
0
    }
1235
87
    cache_block(fp, size);
1236
87
    return 0;
1237
1.34k
}
1238
1239
ssize_t bgzf_read(BGZF *fp, void *data, size_t length)
1240
18.3k
{
1241
18.3k
    ssize_t bytes_read = 0;
1242
18.3k
    uint8_t *output = (uint8_t*)data;
1243
18.3k
    if (length <= 0) return 0;
1244
16.0k
    assert(fp->is_write == 0);
1245
31.6k
    while (bytes_read < length) {
1246
16.9k
        int copy_length, available = fp->block_length - fp->block_offset;
1247
16.9k
        uint8_t *buffer;
1248
16.9k
        if (available <= 0) {
1249
4.91k
            int ret = bgzf_read_block(fp);
1250
4.91k
            if (ret != 0) {
1251
9
                hts_log_error("Read block operation failed with error %d after %zd of %zu bytes", fp->errcode, bytes_read, length);
1252
9
                fp->errcode |= BGZF_ERR_ZLIB;
1253
9
                return -1;
1254
9
            }
1255
4.90k
            available = fp->block_length - fp->block_offset;
1256
4.90k
            if (available == 0) {
1257
1.36k
                if (fp->block_length == 0)
1258
1.36k
                    break; // EOF
1259
1260
                // Offset was at end of block (see commit e9863a0)
1261
0
                fp->block_address = bgzf_htell(fp);
1262
0
                fp->block_offset = fp->block_length = 0;
1263
0
                continue;
1264
3.54k
            } else if (available < 0) {
1265
                // Block offset was set to an invalid coordinate
1266
0
                hts_log_error("BGZF block offset %d set beyond block size %d",
1267
0
                              fp->block_offset, fp->block_length);
1268
0
                fp->errcode |= BGZF_ERR_MISUSE;
1269
0
                return -1;
1270
0
            }
1271
4.90k
        }
1272
15.5k
        copy_length = length - bytes_read < available? length - bytes_read : available;
1273
15.5k
        buffer = (uint8_t*)fp->uncompressed_block;
1274
15.5k
        memcpy(output, buffer + fp->block_offset, copy_length);
1275
15.5k
        fp->block_offset += copy_length;
1276
15.5k
        output += copy_length;
1277
15.5k
        bytes_read += copy_length;
1278
1279
        // For raw gzip streams this avoids short reads.
1280
15.5k
        if (fp->block_offset == fp->block_length) {
1281
1.92k
            fp->block_address = bgzf_htell(fp);
1282
1.92k
            fp->block_offset = fp->block_length = 0;
1283
1.92k
        }
1284
15.5k
    }
1285
1286
16.0k
    fp->uncompressed_address += bytes_read;
1287
1288
16.0k
    return bytes_read;
1289
16.0k
}
1290
1291
// -1 for EOF, -2 for error, 0-255 for byte.
1292
87.1k
int bgzf_peek(BGZF *fp) {
1293
87.1k
    int available = fp->block_length - fp->block_offset;
1294
87.1k
    if (available <= 0) {
1295
12
        if (bgzf_read_block(fp) < 0) {
1296
3
            hts_log_error("Read block operation failed with error %d", fp->errcode);
1297
3
            fp->errcode = BGZF_ERR_ZLIB;
1298
3
            return -2;
1299
3
        }
1300
12
    }
1301
87.1k
    available = fp->block_length - fp->block_offset;
1302
87.1k
    if (available)
1303
87.1k
        return ((unsigned char *)fp->uncompressed_block)[fp->block_offset];
1304
1305
0
    return -1;
1306
87.1k
}
1307
1308
ssize_t bgzf_raw_read(BGZF *fp, void *data, size_t length)
1309
0
{
1310
0
    ssize_t ret = hread(fp->fp, data, length);
1311
0
    if (ret < 0) fp->errcode |= BGZF_ERR_IO;
1312
0
    return ret;
1313
0
}
1314
1315
#ifdef BGZF_MT
1316
1317
/* Function to clean up when jobs are discarded (e.g. during seek)
1318
 * This works for results too, as results are the same struct with
1319
 * decompressed data stored in it. */
1320
0
static void job_cleanup(void *arg) {
1321
0
    bgzf_job *j = (bgzf_job *)arg;
1322
0
    mtaux_t *mt = j->fp->mt;
1323
0
    pthread_mutex_lock(&mt->job_pool_m);
1324
0
    pool_free(mt->job_pool, j);
1325
0
    pthread_mutex_unlock(&mt->job_pool_m);
1326
0
}
1327
1328
0
static void *bgzf_encode_func(void *arg) {
1329
0
    bgzf_job *j = (bgzf_job *)arg;
1330
1331
0
    j->comp_len = BGZF_MAX_BLOCK_SIZE;
1332
0
    int ret = bgzf_compress(j->comp_data, &j->comp_len,
1333
0
                            j->uncomp_data, j->uncomp_len,
1334
0
                            j->fp->compress_level);
1335
0
    if (ret != 0)
1336
0
        j->errcode |= BGZF_ERR_ZLIB;
1337
1338
0
    return arg;
1339
0
}
1340
1341
// Optimisation for compression level 0 (uncompressed deflate blocks)
1342
// Avoids memcpy of the data from uncompressed to compressed buffer.
1343
0
static void *bgzf_encode_level0_func(void *arg) {
1344
0
    bgzf_job *j = (bgzf_job *)arg;
1345
0
    uint32_t crc;
1346
0
    j->comp_len = j->uncomp_len + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH + 5;
1347
1348
    // Data will have already been copied in to
1349
    // j->comp_data + BLOCK_HEADER_LENGTH + 5
1350
1351
    // Add preamble
1352
0
    memcpy(j->comp_data, g_magic, BLOCK_HEADER_LENGTH);
1353
0
    u16_to_le(j->comp_len-1, j->comp_data + 16);
1354
1355
    // Deflate uncompressed data header
1356
0
    j->comp_data[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
1357
0
    u16_to_le(j->uncomp_len, j->comp_data + BLOCK_HEADER_LENGTH + 1);
1358
0
    u16_to_le(~j->uncomp_len, j->comp_data + BLOCK_HEADER_LENGTH + 3);
1359
1360
    // Trailer (CRC, uncompressed length)
1361
0
    crc = hts_crc32(0, j->comp_data + BLOCK_HEADER_LENGTH + 5, j->uncomp_len);
1362
0
    u32_to_le(crc, j->comp_data +  j->comp_len - 8);
1363
0
    u32_to_le(j->uncomp_len, j->comp_data + j->comp_len - 4);
1364
1365
0
    return arg;
1366
0
}
1367
1368
// Our input block has already been decoded by bgzf_mt_read_block().
1369
// We need to split that into a fetch block (compressed) and make this
1370
// do the actual decompression step.
1371
0
static void *bgzf_decode_func(void *arg) {
1372
0
    bgzf_job *j = (bgzf_job *)arg;
1373
1374
0
    j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1375
0
    uint32_t crc = le_to_u32((uint8_t *)j->comp_data + j->comp_len-8);
1376
0
    int ret = bgzf_uncompress(j->uncomp_data, &j->uncomp_len,
1377
0
                              j->comp_data+18, j->comp_len-18, crc);
1378
0
    if (ret != 0)
1379
0
        j->errcode |= BGZF_ERR_ZLIB;
1380
1381
0
    return arg;
1382
0
}
1383
1384
/*
1385
 * Nul function so we can dispatch a job with the correct serial
1386
 * to mark failure or to indicate an empty read (EOF).
1387
 */
1388
0
static void *bgzf_nul_func(void *arg) { return arg; }
1389
1390
/*
1391
 * Takes compressed blocks off the results queue and calls hwrite to
1392
 * punt them to the output stream.
1393
 *
1394
 * Returns NULL when no more are left, or -1 on error
1395
 */
1396
0
static void *bgzf_mt_writer(void *vp) {
1397
0
    BGZF *fp = (BGZF *)vp;
1398
0
    mtaux_t *mt = fp->mt;
1399
0
    hts_tpool_result *r;
1400
1401
0
    if (fp->idx_build_otf) {
1402
0
        fp->idx->moffs = fp->idx->noffs = 1;
1403
0
        fp->idx->offs = (bgzidx1_t*) calloc(fp->idx->moffs, sizeof(bgzidx1_t));
1404
0
        if (!fp->idx->offs) goto err;
1405
0
    }
1406
1407
    // Iterates until result queue is shutdown, where it returns NULL.
1408
0
    while ((r = hts_tpool_next_result_wait(mt->out_queue))) {
1409
0
        bgzf_job *j = (bgzf_job *)hts_tpool_result_data(r);
1410
0
        assert(j);
1411
1412
0
        if (fp->idx_build_otf) {
1413
0
            fp->idx->noffs++;
1414
0
            if ( fp->idx->noffs > fp->idx->moffs )
1415
0
            {
1416
0
                fp->idx->moffs = fp->idx->noffs;
1417
0
                kroundup32(fp->idx->moffs);
1418
0
                fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
1419
0
                if ( !fp->idx->offs ) goto err;
1420
0
            }
1421
0
            fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->offs[ fp->idx->noffs-2 ].uaddr + j->uncomp_len;
1422
0
            fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->idx->offs[ fp->idx->noffs-2 ].caddr + j->comp_len;
1423
0
        }
1424
1425
        // Flush any cached hts_idx_push calls
1426
0
        if (bgzf_idx_flush(fp, j->uncomp_len, j->comp_len) < 0)
1427
0
            goto err;
1428
1429
0
        if (hwrite(fp->fp, j->comp_data, j->comp_len) != j->comp_len)
1430
0
            goto err;
1431
1432
        // Update our local block_address.  Cannot be fp->block_address due to no
1433
        // locking in bgzf_tell.
1434
0
        pthread_mutex_lock(&mt->idx_m);
1435
0
        mt->block_address += j->comp_len;
1436
0
        pthread_mutex_unlock(&mt->idx_m);
1437
1438
        /*
1439
         * Periodically call hflush (which calls fsync when on a file).
1440
         * This avoids the fsync being done at the bgzf_close stage,
1441
         * which can sometimes cause significant delays.  As this is in
1442
         * a separate thread, spreading the sync delays throughout the
1443
         * program execution seems better.
1444
         * Frequency of 1/512 has been chosen by experimentation
1445
         * across local XFS, NFS and Lustre tests.
1446
         */
1447
0
        if (++mt->flush_pending % 512 == 0)
1448
0
            if (hflush(fp->fp) != 0)
1449
0
                goto err;
1450
1451
1452
0
        hts_tpool_delete_result(r, 0);
1453
1454
        // Also updated by main thread
1455
0
        pthread_mutex_lock(&mt->job_pool_m);
1456
0
        pool_free(mt->job_pool, j);
1457
0
        mt->jobs_pending--;
1458
0
        pthread_mutex_unlock(&mt->job_pool_m);
1459
0
    }
1460
1461
0
    if (hflush(fp->fp) != 0)
1462
0
        goto err;
1463
1464
0
    hts_tpool_process_destroy(mt->out_queue);
1465
1466
0
    return NULL;
1467
1468
0
 err:
1469
0
    hts_tpool_process_destroy(mt->out_queue);
1470
0
    return (void *)-1;
1471
0
}
1472
1473
1474
/*
1475
 * Reads a compressed block of data using hread and dispatches it to
1476
 * the thread pool for decompression.  This is the analogue of the old
1477
 * non-threaded bgzf_read_block() function, but without modifying fp
1478
 * in any way (except for the read offset).  All output goes via the
1479
 * supplied bgzf_job struct.
1480
 *
1481
 * Returns NULL when no more are left, or -1 on error
1482
 */
1483
int bgzf_mt_read_block(BGZF *fp, bgzf_job *j)
1484
0
{
1485
0
    uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
1486
0
    int count, block_length, remaining;
1487
1488
    // NOTE: Guaranteed to be compressed as we block multi-threading in
1489
    // uncompressed mode.  However it may be gzip compression instead
1490
    // of bgzf.
1491
1492
    // Reading compressed file
1493
0
    int64_t block_address;
1494
0
    block_address = htell(fp->fp);
1495
1496
0
    j->block_address = block_address;  // in case we exit with j->errcode
1497
1498
0
    if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
1499
0
    count = hpeek(fp->fp, header, sizeof(header));
1500
0
    if (count == 0) // no data read
1501
0
        return -1;
1502
0
    int ret;
1503
0
    if ( count != sizeof(header) || (ret=check_header(header))==-2 )
1504
0
    {
1505
0
        j->errcode |= BGZF_ERR_HEADER;
1506
0
        return -1;
1507
0
    }
1508
0
    if (ret == -1) {
1509
0
        j->errcode |= BGZF_ERR_MT;
1510
0
        return -1;
1511
0
    }
1512
1513
0
    count = hread(fp->fp, header, sizeof(header));
1514
0
    if (count != sizeof(header)) // no data read
1515
0
        return -1;
1516
1517
0
    block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1518
0
    if (block_length < BLOCK_HEADER_LENGTH) {
1519
0
        j->errcode |= BGZF_ERR_HEADER;
1520
0
        return -1;
1521
0
    }
1522
0
    compressed_block = (uint8_t*)j->comp_data;
1523
0
    memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1524
0
    remaining = block_length - BLOCK_HEADER_LENGTH;
1525
0
    count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1526
0
    if (count != remaining) {
1527
0
        j->errcode |= BGZF_ERR_IO;
1528
0
        return -1;
1529
0
    }
1530
0
    j->comp_len = block_length;
1531
0
    j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1532
0
    j->block_address = block_address;
1533
0
    j->fp = fp;
1534
0
    j->errcode = 0;
1535
1536
0
    return 0;
1537
0
}
1538
1539
1540
static int bgzf_check_EOF_common(BGZF *fp)
1541
2.87k
{
1542
2.87k
    uint8_t buf[28];
1543
2.87k
    off_t offset = htell(fp->fp);
1544
2.87k
    if (hseek(fp->fp, -28, SEEK_END) < 0) {
1545
774
        if (errno == ESPIPE) { hclearerr(fp->fp); return 2; }
1546
#ifdef _WIN32
1547
        if (errno == EINVAL) { hclearerr(fp->fp); return 2; }
1548
#else
1549
        // Assume that EINVAL was due to the file being less than 28 bytes
1550
        // long, rather than being a random error return from an hfile backend.
1551
        // This should be reported as "no EOF block" rather than an error.
1552
774
        if (errno == EINVAL) { hclearerr(fp->fp); return 0; }
1553
0
#endif
1554
0
        return -1;
1555
774
    }
1556
2.10k
    if ( hread(fp->fp, buf, 28) != 28 ) return -1;
1557
2.10k
    if ( hseek(fp->fp, offset, SEEK_SET) < 0 ) return -1;
1558
2.10k
    return (memcmp("\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", buf, 28) == 0)? 1 : 0;
1559
2.10k
}
1560
1561
/*
1562
 * Checks EOF from the reader thread.
1563
 */
1564
0
static void bgzf_mt_eof(BGZF *fp) {
1565
0
    mtaux_t *mt = fp->mt;
1566
1567
0
    pthread_mutex_lock(&mt->job_pool_m);
1568
0
    mt->eof = bgzf_check_EOF_common(fp);
1569
0
    pthread_mutex_unlock(&mt->job_pool_m);
1570
0
    mt->command = HAS_EOF_DONE;
1571
0
    pthread_cond_signal(&mt->command_c);
1572
0
}
1573
1574
1575
/*
1576
 * Performs the seek (called by reader thread).
1577
 *
1578
 * This simply drains the entire queue, throwing away blocks, seeks,
1579
 * and starts it up again.  Brute force, but maybe sufficient.
1580
 */
1581
0
static void bgzf_mt_seek(BGZF *fp) {
1582
0
    mtaux_t *mt = fp->mt;
1583
1584
0
    hts_tpool_process_reset(mt->out_queue, 0);
1585
0
    pthread_mutex_lock(&mt->job_pool_m);
1586
0
    mt->errcode = 0;
1587
1588
0
    if (hseek(fp->fp, mt->block_address, SEEK_SET) < 0)
1589
0
        mt->errcode = errno;
1590
1591
0
    pthread_mutex_unlock(&mt->job_pool_m);
1592
0
    mt->command = SEEK_DONE;
1593
0
    pthread_cond_signal(&mt->command_c);
1594
0
}
1595
1596
0
static void *bgzf_mt_reader(void *vp) {
1597
0
    BGZF *fp = (BGZF *)vp;
1598
0
    mtaux_t *mt = fp->mt;
1599
1600
0
restart:
1601
0
    pthread_mutex_lock(&mt->job_pool_m);
1602
0
    bgzf_job *j = pool_alloc(mt->job_pool);
1603
0
    pthread_mutex_unlock(&mt->job_pool_m);
1604
0
    if (!j) goto err;
1605
0
    j->errcode = 0;
1606
0
    j->comp_len = 0;
1607
0
    j->uncomp_len = 0;
1608
0
    j->hit_eof = 0;
1609
0
    j->fp = fp;
1610
1611
0
    while (bgzf_mt_read_block(fp, j) == 0) {
1612
        // Dispatch
1613
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_decode_func, j,
1614
0
                                job_cleanup, job_cleanup, 0) < 0) {
1615
0
            job_cleanup(j);
1616
0
            goto err;
1617
0
        }
1618
1619
        // Check for command
1620
0
        pthread_mutex_lock(&mt->command_m);
1621
0
        switch (mt->command) {
1622
0
        case SEEK:
1623
0
            bgzf_mt_seek(fp);  // Sets mt->command to SEEK_DONE
1624
0
            pthread_mutex_unlock(&mt->command_m);
1625
0
            goto restart;
1626
1627
0
        case HAS_EOF:
1628
0
            bgzf_mt_eof(fp);   // Sets mt->command to HAS_EOF_DONE
1629
0
            break;
1630
1631
0
        case SEEK_DONE:
1632
0
        case HAS_EOF_DONE:
1633
0
            pthread_cond_signal(&mt->command_c);
1634
0
            break;
1635
1636
0
        case CLOSE:
1637
0
            pthread_cond_signal(&mt->command_c);
1638
0
            pthread_mutex_unlock(&mt->command_m);
1639
0
            hts_tpool_process_destroy(mt->out_queue);
1640
0
            return NULL;
1641
1642
0
        default:
1643
0
            break;
1644
0
        }
1645
0
        pthread_mutex_unlock(&mt->command_m);
1646
1647
        // Allocate buffer for next block
1648
0
        pthread_mutex_lock(&mt->job_pool_m);
1649
0
        j = pool_alloc(mt->job_pool);
1650
0
        pthread_mutex_unlock(&mt->job_pool_m);
1651
0
        if (!j) {
1652
0
            hts_tpool_process_destroy(mt->out_queue);
1653
0
            return NULL;
1654
0
        }
1655
0
        j->errcode = 0;
1656
0
        j->comp_len = 0;
1657
0
        j->uncomp_len = 0;
1658
0
        j->hit_eof = 0;
1659
0
        j->fp = fp;
1660
0
    }
1661
1662
0
    if (j->errcode == BGZF_ERR_MT) {
1663
        // Attempt to multi-thread decode a raw gzip stream cannot be done.
1664
        // We tear down the multi-threaded decoder and revert to the old code.
1665
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_nul_func, j,
1666
0
                                job_cleanup, job_cleanup, 0) < 0) {
1667
0
            job_cleanup(j);
1668
0
            hts_tpool_process_destroy(mt->out_queue);
1669
0
            return NULL;
1670
0
        }
1671
0
        hts_tpool_process_ref_decr(mt->out_queue);
1672
0
        return &j->errcode;
1673
0
    }
1674
1675
    // Dispatch an empty block so EOF is spotted.
1676
    // We also use this mechanism for returning errors, in which case
1677
    // j->errcode is set already.
1678
1679
0
    j->hit_eof = 1;
1680
0
    if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_nul_func, j,
1681
0
                            job_cleanup, job_cleanup, 0) < 0) {
1682
0
        job_cleanup(j);
1683
0
        hts_tpool_process_destroy(mt->out_queue);
1684
0
        return NULL;
1685
0
    }
1686
0
    if (j->errcode != 0) {
1687
0
        hts_tpool_process_destroy(mt->out_queue);
1688
0
        return &j->errcode;
1689
0
    }
1690
1691
    // We hit EOF so can stop reading, but we may get a subsequent
1692
    // seek request.  In this case we need to restart the reader.
1693
    //
1694
    // To handle this we wait on a condition variable and then
1695
    // monitor the command. (This could be either seek or close.)
1696
0
    for (;;) {
1697
0
        pthread_mutex_lock(&mt->command_m);
1698
0
        if (mt->command == NONE)
1699
0
            pthread_cond_wait(&mt->command_c, &mt->command_m);
1700
0
        switch(mt->command) {
1701
0
        default:
1702
0
            pthread_mutex_unlock(&mt->command_m);
1703
0
            break;
1704
1705
0
        case SEEK:
1706
0
            bgzf_mt_seek(fp);
1707
0
            pthread_mutex_unlock(&mt->command_m);
1708
0
            goto restart;
1709
1710
0
        case HAS_EOF:
1711
0
            bgzf_mt_eof(fp);   // Sets mt->command to HAS_EOF_DONE
1712
0
            pthread_mutex_unlock(&mt->command_m);
1713
0
            break;
1714
1715
0
        case SEEK_DONE:
1716
0
        case HAS_EOF_DONE:
1717
0
            pthread_cond_signal(&mt->command_c);
1718
0
            pthread_mutex_unlock(&mt->command_m);
1719
0
            break;
1720
1721
0
        case CLOSE:
1722
0
            pthread_cond_signal(&mt->command_c);
1723
0
            pthread_mutex_unlock(&mt->command_m);
1724
0
            hts_tpool_process_destroy(mt->out_queue);
1725
0
            return NULL;
1726
0
        }
1727
0
    }
1728
1729
0
 err:
1730
0
    pthread_mutex_lock(&mt->command_m);
1731
0
    mt->command = CLOSE;
1732
0
    pthread_cond_signal(&mt->command_c);
1733
0
    pthread_mutex_unlock(&mt->command_m);
1734
0
    hts_tpool_process_destroy(mt->out_queue);
1735
0
    return NULL;
1736
0
}
1737
1738
0
int bgzf_thread_pool(BGZF *fp, hts_tpool *pool, int qsize) {
1739
    // No gain from multi-threading when not compressed
1740
0
    if (!fp->is_compressed)
1741
0
        return 0;
1742
1743
0
    mtaux_t *mt;
1744
0
    mt = (mtaux_t*)calloc(1, sizeof(mtaux_t));
1745
0
    if (!mt) return -1;
1746
0
    fp->mt = mt;
1747
1748
0
    mt->pool = pool;
1749
0
    mt->n_threads = hts_tpool_size(pool);
1750
0
    if (!qsize)
1751
0
        qsize = mt->n_threads*2;
1752
0
    if (!(mt->out_queue = hts_tpool_process_init(mt->pool, qsize, 0)))
1753
0
        goto err;
1754
0
    hts_tpool_process_ref_incr(mt->out_queue);
1755
1756
0
    mt->job_pool = pool_create(sizeof(bgzf_job));
1757
0
    if (!mt->job_pool)
1758
0
        goto err;
1759
1760
0
    pthread_mutex_init(&mt->job_pool_m, NULL);
1761
0
    pthread_mutex_init(&mt->command_m, NULL);
1762
0
    pthread_mutex_init(&mt->idx_m, NULL);
1763
0
    pthread_cond_init(&mt->command_c, NULL);
1764
0
    mt->flush_pending = 0;
1765
0
    mt->jobs_pending = 0;
1766
0
    mt->free_block = fp->uncompressed_block; // currently in-use block
1767
0
    mt->block_address = fp->block_address;
1768
0
    pthread_create(&mt->io_task, NULL,
1769
0
                   fp->is_write ? bgzf_mt_writer : bgzf_mt_reader, fp);
1770
1771
0
    return 0;
1772
1773
0
 err:
1774
0
    free(mt);
1775
0
    fp->mt = NULL;
1776
0
    return -1;
1777
0
}
1778
1779
int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1780
0
{
1781
    // No gain from multi-threading when not compressed
1782
0
    if (!fp->is_compressed || fp->is_gzip)
1783
0
        return 0;
1784
1785
0
    if (n_threads < 1) return -1;
1786
0
    hts_tpool *p = hts_tpool_init(n_threads);
1787
0
    if (!p)
1788
0
        return -1;
1789
1790
0
    if (bgzf_thread_pool(fp, p, 0) != 0) {
1791
0
        hts_tpool_destroy(p);
1792
0
        return -1;
1793
0
    }
1794
1795
0
    fp->mt->own_pool = 1;
1796
1797
0
    return 0;
1798
0
}
1799
1800
static int mt_destroy(mtaux_t *mt)
1801
0
{
1802
0
    int ret = 0;
1803
1804
    // Tell the reader to shut down
1805
0
    pthread_mutex_lock(&mt->command_m);
1806
0
    mt->command = CLOSE;
1807
0
    pthread_cond_signal(&mt->command_c);
1808
0
    hts_tpool_wake_dispatch(mt->out_queue); // unstick the reader
1809
0
    pthread_mutex_unlock(&mt->command_m);
1810
1811
    // Check for thread worker failure, indicated by is_shutdown returning 2
1812
    // It's possible really late errors might be missed, but we can live with
1813
    // that.
1814
0
    ret = -(hts_tpool_process_is_shutdown(mt->out_queue) > 1);
1815
    // Destroying the queue first forces the writer to exit.
1816
    // mt->out_queue is reference counted, so destroy gets called in both
1817
    // this and the IO threads.  The last to do it will clean up.
1818
0
    hts_tpool_process_destroy(mt->out_queue);
1819
1820
    // IO thread will now exit.  Wait for it and perform final clean-up.
1821
    // If it returned non-NULL, it was not happy.
1822
0
    void *retval = NULL;
1823
0
    pthread_join(mt->io_task, &retval);
1824
0
    ret = retval != NULL ? -1 : ret;
1825
1826
0
    pthread_mutex_destroy(&mt->job_pool_m);
1827
0
    pthread_mutex_destroy(&mt->command_m);
1828
0
    pthread_mutex_destroy(&mt->idx_m);
1829
0
    pthread_cond_destroy(&mt->command_c);
1830
0
    if (mt->curr_job)
1831
0
        pool_free(mt->job_pool, mt->curr_job);
1832
1833
0
    if (mt->own_pool)
1834
0
        hts_tpool_destroy(mt->pool);
1835
1836
0
    pool_destroy(mt->job_pool);
1837
1838
0
    if (mt->idx_cache.e)
1839
0
        free(mt->idx_cache.e);
1840
1841
0
    free(mt);
1842
0
    fflush(stderr);
1843
1844
0
    return ret;
1845
0
}
1846
1847
static int mt_queue(BGZF *fp)
1848
0
{
1849
0
    mtaux_t *mt = fp->mt;
1850
1851
0
    mt->block_number++;
1852
1853
    // Also updated by writer thread
1854
0
    pthread_mutex_lock(&mt->job_pool_m);
1855
0
    bgzf_job *j = pool_alloc(mt->job_pool);
1856
0
    if (j) mt->jobs_pending++;
1857
0
    pthread_mutex_unlock(&mt->job_pool_m);
1858
0
    if (!j) return -1;
1859
1860
0
    j->fp = fp;
1861
0
    j->errcode = 0;
1862
0
    j->uncomp_len  = fp->block_offset;
1863
0
    if (fp->compress_level == 0) {
1864
0
        memcpy(j->comp_data + BLOCK_HEADER_LENGTH + 5, fp->uncompressed_block,
1865
0
               j->uncomp_len);
1866
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue,
1867
0
                                bgzf_encode_level0_func, j,
1868
0
                                job_cleanup, job_cleanup, 0) < 0) {
1869
0
            goto fail;
1870
0
        }
1871
0
    } else {
1872
0
        memcpy(j->uncomp_data, fp->uncompressed_block, j->uncomp_len);
1873
1874
        // Need non-block vers & job_pending?
1875
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_encode_func, j,
1876
0
                                job_cleanup, job_cleanup, 0) < 0) {
1877
0
            goto fail;
1878
0
        }
1879
0
    }
1880
1881
0
    fp->block_offset = 0;
1882
0
    return 0;
1883
1884
0
 fail:
1885
0
    job_cleanup(j);
1886
0
    pthread_mutex_lock(&mt->job_pool_m);
1887
0
    mt->jobs_pending--;
1888
0
    pthread_mutex_unlock(&mt->job_pool_m);
1889
0
    return -1;
1890
0
}
1891
1892
static int mt_flush_queue(BGZF *fp)
1893
0
{
1894
0
    mtaux_t *mt = fp->mt;
1895
1896
    // Drain the encoder jobs.
1897
    // We cannot use hts_tpool_flush here as it can cause deadlock if
1898
    // the queue is full up of decoder tasks.  The best solution would
1899
    // be to have one input queue per type of job, but we don't right now.
1900
    //hts_tpool_flush(mt->pool);
1901
0
    pthread_mutex_lock(&mt->job_pool_m);
1902
0
    int shutdown = 0;
1903
0
    while (mt->jobs_pending != 0) {
1904
0
        if ((shutdown = hts_tpool_process_is_shutdown(mt->out_queue)))
1905
0
            break;
1906
0
        pthread_mutex_unlock(&mt->job_pool_m);
1907
0
        hts_usleep(10000); // FIXME: replace by condition variable
1908
0
        pthread_mutex_lock(&mt->job_pool_m);
1909
0
    }
1910
0
    pthread_mutex_unlock(&mt->job_pool_m);
1911
1912
0
    if (shutdown)
1913
0
        return -1;
1914
1915
    // Wait on bgzf_mt_writer to drain the queue
1916
0
    if (hts_tpool_process_flush(mt->out_queue) != 0)
1917
0
        return -1;
1918
1919
0
    return (fp->errcode == 0)? 0 : -1;
1920
0
}
1921
1922
static int lazy_flush(BGZF *fp)
1923
17.9k
{
1924
17.9k
    if (fp->mt)
1925
0
        return fp->block_offset ? mt_queue(fp) : 0;
1926
17.9k
    else
1927
17.9k
        return bgzf_flush(fp);
1928
17.9k
}
1929
1930
#else  // ~ #ifdef BGZF_MT
1931
1932
int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1933
{
1934
    return 0;
1935
}
1936
1937
static inline int lazy_flush(BGZF *fp)
1938
{
1939
    return bgzf_flush(fp);
1940
}
1941
1942
#endif // ~ #ifdef BGZF_MT
1943
1944
int bgzf_flush(BGZF *fp)
1945
40.2k
{
1946
40.2k
    if (!fp->is_write) return 0;
1947
40.2k
#ifdef BGZF_MT
1948
40.2k
    if (fp->mt) {
1949
0
        int ret = 0;
1950
0
        if (fp->block_offset) ret = mt_queue(fp);
1951
0
        if (!ret) ret = mt_flush_queue(fp);
1952
1953
        // We maintain mt->block_address when threading as the
1954
        // main code can call bgzf_tell without any locks.
1955
        // (The result from tell are wrong, but we only care about the last
1956
        // 16-bits worth except for the final flush process.
1957
0
        pthread_mutex_lock(&fp->mt->idx_m);
1958
0
        fp->block_address = fp->mt->block_address;
1959
0
        pthread_mutex_unlock(&fp->mt->idx_m);
1960
1961
0
        return ret;
1962
0
    }
1963
40.2k
#endif
1964
1965
40.2k
    if (!fp->is_compressed) {
1966
0
        return hflush(fp->fp);
1967
0
    }
1968
1969
72.7k
    while (fp->block_offset > 0) {
1970
32.5k
        int block_length;
1971
32.5k
        if ( fp->idx_build_otf )
1972
0
        {
1973
0
            bgzf_index_add_block(fp);
1974
0
            fp->idx->ublock_addr += fp->block_offset;
1975
0
        }
1976
32.5k
        block_length = deflate_block(fp, fp->block_offset);
1977
32.5k
        if (block_length < 0) {
1978
0
            hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length, NULL));
1979
0
            return -1;
1980
0
        }
1981
32.5k
        if (hwrite(fp->fp, fp->compressed_block, block_length) != block_length) {
1982
0
            hts_log_error("File write failed (wrong size)");
1983
0
            fp->errcode |= BGZF_ERR_IO; // possibly truncated file
1984
0
            return -1;
1985
0
        }
1986
32.5k
        fp->block_address += block_length;
1987
32.5k
    }
1988
40.2k
    return 0;
1989
40.2k
}
1990
1991
int bgzf_flush_try(BGZF *fp, ssize_t size)
1992
1.51M
{
1993
1.51M
    if (fp->block_offset + size > BGZF_BLOCK_SIZE) return lazy_flush(fp);
1994
1.51M
    return 0;
1995
1.51M
}
1996
1997
ssize_t bgzf_write(BGZF *fp, const void *data, size_t length)
1998
57.8k
{
1999
57.8k
    if ( !fp->is_compressed ) {
2000
0
        size_t push = length + (size_t) fp->block_offset;
2001
0
        fp->block_offset = push % BGZF_MAX_BLOCK_SIZE;
2002
0
        fp->block_address += (push - fp->block_offset);
2003
0
        return hwrite(fp->fp, data, length);
2004
0
    }
2005
2006
57.8k
    const uint8_t *input = (const uint8_t*)data;
2007
57.8k
    ssize_t remaining = length;
2008
57.8k
    assert(fp->is_write);
2009
131k
    while (remaining > 0) {
2010
74.1k
        uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
2011
74.1k
        int copy_length = BGZF_BLOCK_SIZE - fp->block_offset;
2012
74.1k
        if (copy_length > remaining) copy_length = remaining;
2013
74.1k
        memcpy(buffer + fp->block_offset, input, copy_length);
2014
74.1k
        fp->block_offset += copy_length;
2015
74.1k
        input += copy_length;
2016
74.1k
        remaining -= copy_length;
2017
74.1k
        if (fp->block_offset == BGZF_BLOCK_SIZE) {
2018
16.3k
            if (lazy_flush(fp) != 0) return -1;
2019
16.3k
        }
2020
74.1k
    }
2021
57.8k
    return length - remaining;
2022
57.8k
}
2023
2024
ssize_t bgzf_block_write(BGZF *fp, const void *data, size_t length)
2025
0
{
2026
0
    if ( !fp->is_compressed ) {
2027
0
        size_t push = length + (size_t) fp->block_offset;
2028
0
        fp->block_offset = push % BGZF_MAX_BLOCK_SIZE;
2029
0
        fp->block_address += (push - fp->block_offset);
2030
0
        return hwrite(fp->fp, data, length);
2031
0
    }
2032
2033
0
    const uint8_t *input = (const uint8_t*)data;
2034
0
    ssize_t remaining = length;
2035
0
    assert(fp->is_write);
2036
0
    uint64_t current_block; //keep track of current block
2037
0
    uint64_t ublock_size; // amount of uncompressed data to be fed into next block
2038
0
    while (remaining > 0) {
2039
0
        current_block = fp->idx->moffs - fp->idx->noffs;
2040
0
        ublock_size = current_block + 1 < fp->idx->moffs ? fp->idx->offs[current_block+1].uaddr-fp->idx->offs[current_block].uaddr : BGZF_MAX_BLOCK_SIZE;
2041
0
        uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
2042
0
        int copy_length = ublock_size - fp->block_offset;
2043
0
        if (copy_length > remaining) copy_length = remaining;
2044
0
        memcpy(buffer + fp->block_offset, input, copy_length);
2045
0
        fp->block_offset += copy_length;
2046
0
        input += copy_length;
2047
0
        remaining -= copy_length;
2048
0
        if (fp->block_offset == ublock_size) {
2049
0
            if (lazy_flush(fp) != 0) return -1;
2050
0
            if (fp->idx->noffs > 0)
2051
0
                fp->idx->noffs--;  // decrement noffs to track the blocks
2052
0
        }
2053
0
    }
2054
0
    return length - remaining;
2055
0
}
2056
2057
2058
ssize_t bgzf_raw_write(BGZF *fp, const void *data, size_t length)
2059
0
{
2060
0
    ssize_t ret = hwrite(fp->fp, data, length);
2061
0
    if (ret < 0) fp->errcode |= BGZF_ERR_IO;
2062
0
    return ret;
2063
0
}
2064
2065
// Helper function for tidying up fp->mt and setting errcode
2066
18.5k
static void bgzf_close_mt(BGZF *fp) {
2067
18.5k
    if (fp->mt) {
2068
0
        if (!fp->mt->free_block)
2069
0
            fp->uncompressed_block = NULL;
2070
0
        if (mt_destroy(fp->mt) < 0)
2071
0
            fp->errcode = BGZF_ERR_IO;
2072
0
    }
2073
18.5k
}
2074
2075
int bgzf_close(BGZF* fp)
2076
18.8k
{
2077
18.8k
    int ret, block_length;
2078
18.8k
    if (fp == 0) return -1;
2079
18.5k
    if (fp->is_write && fp->is_compressed) {
2080
11.7k
        if (bgzf_flush(fp) != 0) {
2081
0
            bgzf_close_mt(fp);
2082
0
            return -1;
2083
0
        }
2084
11.7k
        fp->compress_level = -1;
2085
11.7k
        block_length = deflate_block(fp, 0); // write an empty block
2086
11.7k
        if (block_length < 0) {
2087
0
            hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length, NULL));
2088
0
            bgzf_close_mt(fp);
2089
0
            return -1;
2090
0
        }
2091
11.7k
        if (hwrite(fp->fp, fp->compressed_block, block_length) < 0
2092
11.7k
            || hflush(fp->fp) != 0) {
2093
0
            hts_log_error("File write failed");
2094
0
            fp->errcode |= BGZF_ERR_IO;
2095
0
            return -1;
2096
0
        }
2097
11.7k
    }
2098
2099
18.5k
    bgzf_close_mt(fp);
2100
2101
18.5k
    if ( fp->is_gzip )
2102
1.51k
    {
2103
1.51k
        if (fp->gz_stream == NULL) ret = Z_OK;
2104
1.07k
        else if (!fp->is_write) ret = inflateEnd(fp->gz_stream);
2105
0
        else ret = deflateEnd(fp->gz_stream);
2106
1.51k
        if (ret != Z_OK) {
2107
0
            hts_log_error("Call to inflateEnd/deflateEnd failed: %s", bgzf_zerr(ret, NULL));
2108
0
        }
2109
1.51k
        free(fp->gz_stream);
2110
1.51k
    }
2111
18.5k
    ret = hclose(fp->fp);
2112
18.5k
    if (ret != 0) return -1;
2113
18.5k
    bgzf_index_destroy(fp);
2114
18.5k
    free(fp->uncompressed_block);
2115
18.5k
    free_cache(fp);
2116
18.5k
    ret = fp->errcode ? -1 : 0;
2117
18.5k
    free(fp);
2118
18.5k
    return ret;
2119
18.5k
}
2120
2121
void bgzf_set_cache_size(BGZF *fp, int cache_size)
2122
0
{
2123
0
    if (fp && fp->mt) return; // Not appropriate when multi-threading
2124
0
    if (fp && fp->cache) fp->cache_size = cache_size;
2125
0
}
2126
2127
2.87k
int bgzf_check_EOF(BGZF *fp) {
2128
2.87k
    int has_eof;
2129
2130
2.87k
    if (fp->mt) {
2131
0
        pthread_mutex_lock(&fp->mt->command_m);
2132
        // fp->mt->command state transitions should be:
2133
        // NONE -> HAS_EOF -> HAS_EOF_DONE -> NONE
2134
        // (HAS_EOF -> HAS_EOF_DONE happens in bgzf_mt_reader thread)
2135
0
        if (fp->mt->command != CLOSE)
2136
0
            fp->mt->command = HAS_EOF;
2137
0
        pthread_cond_signal(&fp->mt->command_c);
2138
0
        hts_tpool_wake_dispatch(fp->mt->out_queue);
2139
0
        do {
2140
0
            if (fp->mt->command == CLOSE) {
2141
                // possible error in bgzf_mt_reader
2142
0
                pthread_mutex_unlock(&fp->mt->command_m);
2143
0
                return 0;
2144
0
            }
2145
0
            pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
2146
0
            switch (fp->mt->command) {
2147
0
            case HAS_EOF_DONE: break;
2148
0
            case HAS_EOF:
2149
                // Resend signal intended for bgzf_mt_reader()
2150
0
                pthread_cond_signal(&fp->mt->command_c);
2151
0
                break;
2152
0
            case CLOSE:
2153
0
                continue;
2154
0
            default:
2155
0
                abort();  // Should not get to any other state
2156
0
            }
2157
0
        } while (fp->mt->command != HAS_EOF_DONE);
2158
0
        fp->mt->command = NONE;
2159
0
        has_eof = fp->mt->eof;
2160
0
        pthread_mutex_unlock(&fp->mt->command_m);
2161
2.87k
    } else {
2162
2.87k
        has_eof = bgzf_check_EOF_common(fp);
2163
2.87k
    }
2164
2165
2.87k
    fp->no_eof_block = (has_eof == 0);
2166
2167
2.87k
    return has_eof;
2168
2.87k
}
2169
2170
static inline int64_t bgzf_seek_common(BGZF* fp,
2171
                                       int64_t block_address, int block_offset)
2172
0
{
2173
0
    if (fp->mt) {
2174
        // The reader runs asynchronous and does loops of:
2175
        //    Read block
2176
        //    Check & process command
2177
        //    Dispatch decode job
2178
        //
2179
        // Once at EOF it then switches to loops of
2180
        //    Wait for command
2181
        //    Process command (possibly switching back to above loop).
2182
        //
2183
        // To seek we therefore send the reader thread a SEEK command,
2184
        // waking it up if blocked in dispatch and signalling if
2185
        // waiting for a command.  We then wait for the response so we
2186
        // know the seek succeeded.
2187
0
        pthread_mutex_lock(&fp->mt->command_m);
2188
0
        fp->mt->hit_eof = 0;
2189
        // fp->mt->command state transitions should be:
2190
        // NONE -> SEEK -> SEEK_DONE -> NONE
2191
        // (SEEK -> SEEK_DONE happens in bgzf_mt_reader thread)
2192
0
        fp->mt->command = SEEK;
2193
0
        fp->mt->block_address = block_address;
2194
0
        pthread_cond_signal(&fp->mt->command_c);
2195
0
        hts_tpool_wake_dispatch(fp->mt->out_queue);
2196
0
        do {
2197
0
            pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
2198
0
            switch (fp->mt->command) {
2199
0
            case SEEK_DONE: break;
2200
0
            case SEEK:
2201
                // Resend signal intended for bgzf_mt_reader()
2202
0
                pthread_cond_signal(&fp->mt->command_c);
2203
0
                break;
2204
0
            default:
2205
0
                abort();  // Should not get to any other state
2206
0
            }
2207
0
        } while (fp->mt->command != SEEK_DONE);
2208
2209
0
        fp->mt->command = NONE;
2210
2211
0
        if (fp->mt->errcode) {
2212
0
            fp->errcode |= BGZF_ERR_IO;
2213
0
            errno = fp->mt->errcode;
2214
0
            pthread_mutex_unlock(&fp->mt->command_m);
2215
0
            return -1;
2216
0
        }
2217
2218
0
        fp->block_length = 0;  // indicates current block has not been loaded
2219
0
        fp->block_address = block_address;
2220
0
        fp->block_offset = block_offset;
2221
2222
0
        pthread_mutex_unlock(&fp->mt->command_m);
2223
0
    } else {
2224
0
        if (hseek(fp->fp, block_address, SEEK_SET) < 0) {
2225
0
            fp->errcode |= BGZF_ERR_IO;
2226
0
            return -1;
2227
0
        }
2228
0
        fp->block_length = 0;  // indicates current block has not been loaded
2229
0
        fp->block_address = block_address;
2230
0
        fp->block_offset = block_offset;
2231
0
    }
2232
2233
0
    return 0;
2234
0
}
2235
2236
int64_t bgzf_seek(BGZF* fp, int64_t pos, int where)
2237
0
{
2238
0
    if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
2239
0
        fp->errcode |= BGZF_ERR_MISUSE;
2240
0
        return -1;
2241
0
    }
2242
2243
    // This is a flag to indicate we've jumped elsewhere in the stream, to act
2244
    // as a hint to any other code which is wrapping up bgzf for its own
2245
    // purposes.  We may not be able to tell when seek happens as it can be
2246
    // done on our behalf, eg by the iterator.
2247
    //
2248
    // This is never cleared here.  Any tool that needs to handle it is also
2249
    // responsible for clearing it.
2250
0
    fp->seeked = pos;
2251
2252
0
    return bgzf_seek_common(fp, pos >> 16, pos & 0xFFFF);
2253
0
}
2254
2255
int bgzf_is_bgzf(const char *fn)
2256
0
{
2257
0
    uint8_t buf[16];
2258
0
    int n;
2259
0
    hFILE *fp;
2260
0
    if ((fp = hopen(fn, "r")) == 0) return 0;
2261
0
    n = hread(fp, buf, 16);
2262
0
    if (hclose(fp) < 0) return 0;
2263
0
    if (n != 16) return 0;
2264
0
    return check_header(buf) == 0? 1 : 0;
2265
0
}
2266
2267
int bgzf_compression(BGZF *fp)
2268
0
{
2269
0
    return (!fp->is_compressed)? no_compression : (fp->is_gzip)? gzip : bgzf;
2270
0
}
2271
2272
int bgzf_getc(BGZF *fp)
2273
1.30k
{
2274
1.30k
    if (fp->block_offset+1 < fp->block_length) {
2275
0
        fp->uncompressed_address++;
2276
0
        return ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
2277
0
    }
2278
2279
1.30k
    int c;
2280
1.30k
    if (fp->block_offset >= fp->block_length) {
2281
932
        if (bgzf_read_block(fp) != 0) return -2; /* error */
2282
929
        if (fp->block_length == 0) return -1; /* end-of-file */
2283
929
    }
2284
846
    c = ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
2285
846
    if (fp->block_offset == fp->block_length) {
2286
378
        fp->block_address = bgzf_htell(fp);
2287
378
        fp->block_offset = 0;
2288
378
        fp->block_length = 0;
2289
378
    }
2290
846
    fp->uncompressed_address++;
2291
846
    return c;
2292
1.30k
}
2293
2294
int bgzf_getline(BGZF *fp, int delim, kstring_t *str)
2295
25.3M
{
2296
25.3M
    int l, state = 0;
2297
25.3M
    str->l = 0;
2298
25.3M
    do {
2299
25.3M
        if (fp->block_offset >= fp->block_length) {
2300
48.7k
            if (bgzf_read_block(fp) != 0) { state = -2; break; }
2301
48.1k
            if (fp->block_length == 0) { state = -1; break; }
2302
48.1k
        }
2303
25.3M
        unsigned char *buf = fp->uncompressed_block;
2304
2305
        // Equivalent to a naive byte by byte search from
2306
        // buf + block_offset to buf + block_length.
2307
25.3M
        void *e = memchr(&buf[fp->block_offset], delim,
2308
25.3M
                         fp->block_length - fp->block_offset);
2309
25.3M
        l = e ? (unsigned char *)e - buf : fp->block_length;
2310
2311
25.3M
        if (l < fp->block_length) state = 1;
2312
25.3M
        l -= fp->block_offset;
2313
25.3M
        if (ks_expand(str, l + 2) < 0) { state = -3; break; }
2314
25.3M
        memcpy(str->s + str->l, buf + fp->block_offset, l);
2315
25.3M
        str->l += l;
2316
25.3M
        fp->block_offset += l + 1;
2317
25.3M
        if (fp->block_offset >= fp->block_length) {
2318
47.5k
            fp->block_address = bgzf_htell(fp);
2319
47.5k
            fp->block_offset = 0;
2320
47.5k
            fp->block_length = 0;
2321
47.5k
        }
2322
25.3M
    } while (state == 0);
2323
25.3M
    if (state < -1) return state;
2324
25.3M
    if (str->l == 0 && state < 0) return state;
2325
25.3M
    fp->uncompressed_address += str->l + 1;
2326
25.3M
    if ( delim=='\n' && str->l>0 && str->s[str->l-1]=='\r' ) str->l--;
2327
25.3M
    str->s[str->l] = 0;
2328
25.3M
    return str->l <= INT_MAX ? (int) str->l : INT_MAX;
2329
25.3M
}
2330
2331
void bgzf_index_destroy(BGZF *fp)
2332
18.6k
{
2333
18.6k
    if ( !fp->idx ) return;
2334
3
    free(fp->idx->offs);
2335
3
    free(fp->idx);
2336
3
    fp->idx = NULL;
2337
3
    fp->idx_build_otf = 0;
2338
3
}
2339
2340
int bgzf_index_build_init(BGZF *fp)
2341
3
{
2342
3
    bgzf_index_destroy(fp);
2343
3
    fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
2344
3
    if ( !fp->idx ) return -1;
2345
3
    fp->idx_build_otf = 1;  // build index on the fly
2346
3
    return 0;
2347
3
}
2348
2349
int bgzf_index_add_block(BGZF *fp)
2350
0
{
2351
0
    fp->idx->noffs++;
2352
0
    if ( fp->idx->noffs > fp->idx->moffs )
2353
0
    {
2354
0
        fp->idx->moffs = fp->idx->noffs;
2355
0
        kroundup32(fp->idx->moffs);
2356
0
        fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
2357
0
        if ( !fp->idx->offs ) return -1;
2358
0
    }
2359
0
    fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->ublock_addr;
2360
0
    fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->block_address;
2361
0
    return 0;
2362
0
}
2363
2364
static inline int hwrite_uint64(uint64_t x, hFILE *f)
2365
0
{
2366
0
    if (ed_is_big()) x = ed_swap_8(x);
2367
0
    if (hwrite(f, &x, sizeof(x)) != sizeof(x)) return -1;
2368
0
    return 0;
2369
0
}
2370
2371
static char * get_name_suffix(const char *bname, const char *suffix)
2372
0
{
2373
0
    size_t len = strlen(bname) + strlen(suffix) + 1;
2374
0
    char *buff = malloc(len);
2375
0
    if (!buff) return NULL;
2376
0
    snprintf(buff, len, "%s%s", bname, suffix);
2377
0
    return buff;
2378
0
}
2379
2380
int bgzf_index_dump_hfile(BGZF *fp, struct hFILE *idx, const char *name)
2381
0
{
2382
    // Note that the index contains one extra record when indexing files opened
2383
    // for reading. The terminating record is not present when opened for writing.
2384
    // This is not a bug.
2385
2386
0
    int i;
2387
2388
0
    if (!fp->idx) {
2389
0
        hts_log_error("Called for BGZF handle with no index");
2390
0
        errno = EINVAL;
2391
0
        return -1;
2392
0
    }
2393
2394
0
    if (bgzf_flush(fp) != 0) return -1;
2395
2396
    // discard the entry marking the end of the file
2397
0
    if (fp->mt && fp->idx)
2398
0
        fp->idx->noffs--;
2399
2400
0
    if (hwrite_uint64(fp->idx->noffs - 1, idx) < 0) goto fail;
2401
0
    for (i=1; i<fp->idx->noffs; i++)
2402
0
    {
2403
0
        if (hwrite_uint64(fp->idx->offs[i].caddr, idx) < 0) goto fail;
2404
0
        if (hwrite_uint64(fp->idx->offs[i].uaddr, idx) < 0) goto fail;
2405
0
    }
2406
0
    return 0;
2407
2408
0
 fail:
2409
0
    hts_log_error("Error writing to %s : %s", name ? name : "index", strerror(errno));
2410
0
    return -1;
2411
0
}
2412
2413
int bgzf_index_dump(BGZF *fp, const char *bname, const char *suffix)
2414
0
{
2415
0
    const char *name = bname, *msg = NULL;
2416
0
    char *tmp = NULL;
2417
0
    hFILE *idx = NULL;
2418
2419
0
    if (!fp->idx) {
2420
0
        hts_log_error("Called for BGZF handle with no index");
2421
0
        errno = EINVAL;
2422
0
        return -1;
2423
0
    }
2424
2425
0
    if ( suffix )
2426
0
    {
2427
0
        tmp = get_name_suffix(bname, suffix);
2428
0
        if ( !tmp ) return -1;
2429
0
        name = tmp;
2430
0
    }
2431
2432
0
    idx = hopen(name, "wb");
2433
0
    if ( !idx ) {
2434
0
        msg = "Error opening";
2435
0
        goto fail;
2436
0
    }
2437
2438
0
    if (bgzf_index_dump_hfile(fp, idx, name) != 0) goto fail;
2439
2440
0
    if (hclose(idx) < 0)
2441
0
    {
2442
0
        idx = NULL;
2443
0
        msg = "Error on closing";
2444
0
        goto fail;
2445
0
    }
2446
2447
0
    free(tmp);
2448
0
    return 0;
2449
2450
0
 fail:
2451
0
    if (msg != NULL) {
2452
0
        hts_log_error("%s %s : %s", msg, name, strerror(errno));
2453
0
    }
2454
0
    if (idx) hclose_abruptly(idx);
2455
0
    free(tmp);
2456
0
    return -1;
2457
0
}
2458
2459
static inline int hread_uint64(uint64_t *xptr, hFILE *f)
2460
0
{
2461
0
    if (hread(f, xptr, sizeof(*xptr)) != sizeof(*xptr)) return -1;
2462
0
    if (ed_is_big()) ed_swap_8p(xptr);
2463
0
    return 0;
2464
0
}
2465
2466
int bgzf_index_load_hfile(BGZF *fp, struct hFILE *idx, const char *name)
2467
0
{
2468
0
    fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
2469
0
    if (fp->idx == NULL) goto fail;
2470
0
    uint64_t x;
2471
0
    if (hread_uint64(&x, idx) < 0) goto fail;
2472
2473
0
    fp->idx->noffs = fp->idx->moffs = x + 1;
2474
0
    fp->idx->offs  = (bgzidx1_t*) malloc(fp->idx->moffs*sizeof(bgzidx1_t));
2475
0
    if (fp->idx->offs == NULL) goto fail;
2476
0
    fp->idx->offs[0].caddr = fp->idx->offs[0].uaddr = 0;
2477
2478
0
    int i;
2479
0
    for (i=1; i<fp->idx->noffs; i++)
2480
0
    {
2481
0
        if (hread_uint64(&fp->idx->offs[i].caddr, idx) < 0) goto fail;
2482
0
        if (hread_uint64(&fp->idx->offs[i].uaddr, idx) < 0) goto fail;
2483
0
    }
2484
2485
0
    return 0;
2486
2487
0
 fail:
2488
0
    hts_log_error("Error reading %s : %s", name ? name : "index", strerror(errno));
2489
0
    if (fp->idx) {
2490
0
        free(fp->idx->offs);
2491
0
        free(fp->idx);
2492
0
        fp->idx = NULL;
2493
0
    }
2494
0
    return -1;
2495
0
}
2496
2497
int bgzf_index_load(BGZF *fp, const char *bname, const char *suffix)
2498
0
{
2499
0
    const char *name = bname, *msg = NULL;
2500
0
    char *tmp = NULL;
2501
0
    hFILE *idx = NULL;
2502
0
    if ( suffix )
2503
0
    {
2504
0
        tmp = get_name_suffix(bname, suffix);
2505
0
        if ( !tmp ) return -1;
2506
0
        name = tmp;
2507
0
    }
2508
2509
0
    idx = hopen(name, "rb");
2510
0
    if ( !idx ) {
2511
0
        msg = "Error opening";
2512
0
        goto fail;
2513
0
    }
2514
2515
0
    if (bgzf_index_load_hfile(fp, idx, name) != 0) goto fail;
2516
2517
0
    if (hclose(idx) != 0) {
2518
0
        idx = NULL;
2519
0
        msg = "Error closing";
2520
0
        goto fail;
2521
0
    }
2522
2523
0
    free(tmp);
2524
0
    return 0;
2525
2526
0
 fail:
2527
0
    if (msg != NULL) {
2528
0
        hts_log_error("%s %s : %s", msg, name, strerror(errno));
2529
0
    }
2530
0
    if (idx) hclose_abruptly(idx);
2531
0
    free(tmp);
2532
0
    return -1;
2533
0
}
2534
2535
int bgzf_useek(BGZF *fp, off_t uoffset, int where)
2536
0
{
2537
0
    if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
2538
0
        fp->errcode |= BGZF_ERR_MISUSE;
2539
0
        return -1;
2540
0
    }
2541
0
    if (uoffset >= fp->uncompressed_address - fp->block_offset &&
2542
0
        uoffset < fp->uncompressed_address + fp->block_length - fp->block_offset) {
2543
        // Can seek into existing data
2544
0
        fp->block_offset += uoffset - fp->uncompressed_address;
2545
0
        fp->uncompressed_address = uoffset;
2546
0
        return 0;
2547
0
    }
2548
0
    if ( !fp->is_compressed )
2549
0
    {
2550
0
        if (hseek(fp->fp, uoffset, SEEK_SET) < 0)
2551
0
        {
2552
0
            fp->errcode |= BGZF_ERR_IO;
2553
0
            return -1;
2554
0
        }
2555
0
        fp->block_length = 0;  // indicates current block has not been loaded
2556
0
        fp->block_address = uoffset;
2557
0
        fp->block_offset = 0;
2558
0
        if (bgzf_read_block(fp) < 0) {
2559
0
            fp->errcode |= BGZF_ERR_IO;
2560
0
            return -1;
2561
0
        }
2562
0
        fp->uncompressed_address = uoffset;
2563
0
        return 0;
2564
0
    }
2565
2566
0
    if ( !fp->idx )
2567
0
    {
2568
0
        fp->errcode |= BGZF_ERR_IO;
2569
0
        return -1;
2570
0
    }
2571
2572
    // binary search
2573
0
    int ilo = 0, ihi = fp->idx->noffs - 1;
2574
0
    while ( ilo<=ihi )
2575
0
    {
2576
0
        int i = (ilo+ihi)*0.5;
2577
0
        if ( uoffset < fp->idx->offs[i].uaddr ) ihi = i - 1;
2578
0
        else if ( uoffset >= fp->idx->offs[i].uaddr ) ilo = i + 1;
2579
0
        else break;
2580
0
    }
2581
0
    int i = ilo-1;
2582
0
    off_t offset = 0;
2583
0
    if (bgzf_seek_common(fp, fp->idx->offs[i].caddr, 0) < 0)
2584
0
        return -1;
2585
2586
0
    if ( bgzf_read_block(fp) < 0 ) {
2587
0
        fp->errcode |= BGZF_ERR_IO;
2588
0
        return -1;
2589
0
    }
2590
0
    offset = uoffset - fp->idx->offs[i].uaddr;
2591
0
    if ( offset > 0 )
2592
0
    {
2593
0
        if (offset > fp->block_length) {
2594
0
            fp->errcode |= BGZF_ERR_IO;
2595
0
            return -1;                                      //offset outside the available data
2596
0
        }
2597
0
        fp->block_offset = offset;
2598
0
        assert( fp->block_offset <= fp->block_length );     // todo: skipped, unindexed, blocks
2599
0
    }
2600
0
    fp->uncompressed_address = uoffset;
2601
0
    return 0;
2602
0
}
2603
2604
off_t bgzf_utell(BGZF *fp)
2605
23.9k
{
2606
23.9k
    return fp->uncompressed_address;    // currently maintained only when reading
2607
23.9k
}
2608
2609
/* prototype is in hfile_internal.h */
2610
0
struct hFILE *bgzf_hfile(struct BGZF *fp) {
2611
0
    return fp->fp;
2612
0
}