Coverage Report

Created: 2025-10-10 07:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/htslib/bgzf.c
Line
Count
Source
1
/* The MIT License
2
3
   Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4
                 2011, 2012 Attractive Chaos <attractor@live.co.uk>
5
   Copyright (C) 2009, 2013-2025 Genome Research Ltd
6
7
   Permission is hereby granted, free of charge, to any person obtaining a copy
8
   of this software and associated documentation files (the "Software"), to deal
9
   in the Software without restriction, including without limitation the rights
10
   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11
   copies of the Software, and to permit persons to whom the Software is
12
   furnished to do so, subject to the following conditions:
13
14
   The above copyright notice and this permission notice shall be included in
15
   all copies or substantial portions of the Software.
16
17
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19
   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20
   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21
   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22
   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23
   THE SOFTWARE.
24
*/
25
26
#define HTS_BUILDING_LIBRARY // Enables HTSLIB_EXPORT, see htslib/hts_defs.h
27
#include <config.h>
28
29
#include <stdio.h>
30
#include <stdlib.h>
31
#include <string.h>
32
#include <errno.h>
33
#include <unistd.h>
34
#include <assert.h>
35
#include <pthread.h>
36
#include <sys/types.h>
37
#include <inttypes.h>
38
#include <zlib.h>
39
40
#ifdef HAVE_LIBDEFLATE
41
#include <libdeflate.h>
42
#endif
43
44
#include "htslib/hts.h"
45
#include "htslib/bgzf.h"
46
#include "htslib/hfile.h"
47
#include "htslib/thread_pool.h"
48
#include "htslib/hts_endian.h"
49
#include "cram/pooled_alloc.h"
50
#include "hts_internal.h"
51
52
#ifndef EFTYPE
53
24
#define EFTYPE ENOEXEC
54
#endif
55
56
#define BGZF_CACHE
57
#define BGZF_MT
58
59
148k
#define BLOCK_HEADER_LENGTH 18
60
73.9k
#define BLOCK_FOOTER_LENGTH 8
61
62
63
/* BGZF/GZIP header (specialized from RFC 1952; little endian):
64
 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
65
 | 31|139|  8|  4|              0|  0|255|      6| 66| 67|      2|BLK_LEN|
66
 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
67
  BGZF extension:
68
                ^                              ^   ^   ^
69
                |                              |   |   |
70
               FLG.EXTRA                     XLEN  B   C
71
72
  BGZF format is compatible with GZIP. It limits the size of each compressed
73
  block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
74
  records the size.
75
76
*/
77
static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
78
79
#ifdef BGZF_CACHE
80
typedef struct {
81
    int size;
82
    uint8_t *block;
83
    int64_t end_offset;
84
} cache_t;
85
86
#include "htslib/khash.h"
87
KHASH_MAP_INIT_INT64(cache, cache_t)
88
#endif
89
90
struct bgzf_cache_t {
91
    khash_t(cache) *h;
92
    khint_t last_pos;
93
};
94
95
#ifdef BGZF_MT
96
97
typedef struct bgzf_job {
98
    BGZF *fp;
99
    unsigned char comp_data[BGZF_MAX_BLOCK_SIZE];
100
    size_t comp_len;
101
    unsigned char uncomp_data[BGZF_MAX_BLOCK_SIZE];
102
    size_t uncomp_len;
103
    int errcode;
104
    int64_t block_address;
105
    int hit_eof;
106
} bgzf_job;
107
108
enum mtaux_cmd {
109
    NONE = 0,
110
    SEEK,
111
    SEEK_DONE,
112
    HAS_EOF,
113
    HAS_EOF_DONE,
114
    CLOSE,
115
};
116
117
// When multi-threaded bgzf_tell won't work, so we delay the hts_idx_push
118
// until we've written the last block.
119
typedef struct {
120
    hts_pos_t beg, end;
121
    int tid, is_mapped;  // args for hts_idx_push
122
    uint64_t offset, block_number;
123
} hts_idx_cache_entry;
124
125
typedef struct {
126
    int nentries, mentries; // used and allocated
127
    hts_idx_cache_entry *e; // hts_idx elements
128
} hts_idx_cache_t;
129
130
typedef struct bgzf_mtaux_t {
131
    // Memory pool for bgzf_job structs, to avoid many malloc/free
132
    pool_alloc_t *job_pool;
133
    bgzf_job *curr_job;
134
135
    // Thread pool
136
    int n_threads;
137
    int own_pool;
138
    hts_tpool *pool;
139
140
    // Output queue holding completed bgzf_jobs
141
    hts_tpool_process *out_queue;
142
143
    // I/O thread.
144
    pthread_t io_task;
145
    pthread_mutex_t job_pool_m;
146
    int jobs_pending; // number of jobs waiting
147
    int flush_pending;
148
    void *free_block;
149
    int hit_eof;  // r/w entirely within main thread
150
151
    // Message passing to the reader thread; eg seek requests
152
    int errcode;
153
    uint64_t block_address;
154
    int eof;
155
    pthread_mutex_t command_m; // Set whenever fp is being updated
156
    pthread_cond_t command_c;
157
    enum mtaux_cmd command;
158
159
    // For multi-threaded on-the-fly indexing. See bgzf_idx_push below.
160
    pthread_mutex_t idx_m;
161
    hts_idx_t *hts_idx;
162
    uint64_t block_number, block_written;
163
    hts_idx_cache_t idx_cache;
164
} mtaux_t;
165
#endif
166
167
typedef struct
168
{
169
    uint64_t uaddr;  // offset w.r.t. uncompressed data
170
    uint64_t caddr;  // offset w.r.t. compressed data
171
}
172
bgzidx1_t;
173
174
struct bgzidx_t
175
{
176
    int noffs, moffs;       // the size of the index, n:used, m:allocated
177
    bgzidx1_t *offs;        // offsets
178
    uint64_t ublock_addr;   // offset of the current block (uncompressed data)
179
};
180
181
/*
182
 * Buffers up arguments to hts_idx_push for later use, once we've written all bar
183
 * this block.  This is necessary when multiple blocks are in flight (threading)
184
 * and fp->block_address isn't known at the time of call as we have in-flight
185
 * blocks that haven't yet been compressed.
186
 *
187
 * NB: this only matters when we're indexing on the fly (writing).
188
 * Normal indexing is threaded reads, but we already know block sizes
189
 * so it's a simpler process
190
 *
191
 * Returns 0 on success,
192
 *        -1 on failure
193
 */
194
0
int bgzf_idx_push(BGZF *fp, hts_idx_t *hidx, int tid, hts_pos_t beg, hts_pos_t end, uint64_t offset, int is_mapped) {
195
0
    hts_idx_cache_entry *e;
196
0
    mtaux_t *mt = fp->mt;
197
198
0
    if (!mt)
199
0
        return hts_idx_push(hidx, tid, beg, end, offset, is_mapped);
200
201
    // Early check for out of range positions which would fail in hts_idx_push()
202
0
    if (hts_idx_check_range(hidx, tid, beg, end) < 0)
203
0
        return -1;
204
205
0
    pthread_mutex_lock(&mt->idx_m);
206
207
0
    mt->hts_idx = hidx;
208
0
    hts_idx_cache_t *ic = &mt->idx_cache;
209
210
0
    if (ic->nentries >= ic->mentries) {
211
0
        int new_sz = ic->mentries ? ic->mentries*2 : 1024;
212
0
        if (!(e = realloc(ic->e, new_sz * sizeof(*ic->e)))) {
213
0
            pthread_mutex_unlock(&mt->idx_m);
214
0
            return -1;
215
0
        }
216
0
        ic->e = e;
217
0
        ic->mentries = new_sz;
218
0
    }
219
220
0
    e = &ic->e[ic->nentries++];
221
0
    e->tid = tid;
222
0
    e->beg = beg;
223
0
    e->end = end;
224
0
    e->is_mapped = is_mapped;
225
0
    e->offset = offset & 0xffff;
226
0
    e->block_number = mt->block_number;
227
228
0
    pthread_mutex_unlock(&mt->idx_m);
229
230
0
    return 0;
231
0
}
232
233
static int bgzf_idx_flush(BGZF *fp,
234
0
                          size_t block_uncomp_len, size_t block_comp_len) {
235
0
    mtaux_t *mt = fp->mt;
236
237
0
    if (!mt->idx_cache.e) {
238
0
        mt->block_written++;
239
0
        return 0;
240
0
    }
241
242
0
    pthread_mutex_lock(&mt->idx_m);
243
244
0
    hts_idx_cache_entry *e = mt->idx_cache.e;
245
0
    int i;
246
247
0
    assert(mt->idx_cache.nentries == 0 || mt->block_written <= e[0].block_number);
248
249
0
    for (i = 0; i < mt->idx_cache.nentries && e[i].block_number == mt->block_written; i++) {
250
0
        if (block_uncomp_len > 0 && e[i].offset == block_uncomp_len) {
251
            /*
252
             * If the virtual offset is at the end of the current block,
253
             * adjust it to point to the start of the next one.  This
254
             * is needed when on-the-fly indexing has recorded a virtual
255
             * offset just before a new block has been started, and makes
256
             * on-the-fly and standard indexing give exactly the same results.
257
             *
258
             * In theory the two virtual offsets are equivalent, but pointing
259
             * to the end of a block is inefficient, and caused problems with
260
             * versions of HTSlib before 1.11 where bgzf_read() would
261
             * incorrectly return EOF.
262
             */
263
264
            // Assert that this is the last entry for the current block_number
265
0
            assert(i == mt->idx_cache.nentries - 1
266
0
                   || e[i].block_number < e[i + 1].block_number);
267
268
            // Work out where the next block starts.  For this entry, the
269
            // offset will be zero.
270
0
            uint64_t next_block_addr = mt->block_address + block_comp_len;
271
0
            if (hts_idx_push(mt->hts_idx, e[i].tid, e[i].beg, e[i].end,
272
0
                             next_block_addr << 16, e[i].is_mapped) < 0) {
273
0
                pthread_mutex_unlock(&mt->idx_m);
274
0
                return -1;
275
0
            }
276
            // Count this entry and drop out of the loop
277
0
            i++;
278
0
            break;
279
0
        }
280
281
0
        if (hts_idx_push(mt->hts_idx, e[i].tid, e[i].beg, e[i].end,
282
0
                         (mt->block_address << 16) + e[i].offset,
283
0
                         e[i].is_mapped) < 0) {
284
0
            pthread_mutex_unlock(&mt->idx_m);
285
0
            return -1;
286
0
        }
287
0
    }
288
289
0
    memmove(&e[0], &e[i], (mt->idx_cache.nentries - i) * sizeof(*e));
290
0
    mt->idx_cache.nentries -= i;
291
0
    mt->block_written++;
292
293
0
    pthread_mutex_unlock(&mt->idx_m);
294
0
    return 0;
295
0
}
296
297
void bgzf_index_destroy(BGZF *fp);
298
int bgzf_index_add_block(BGZF *fp);
299
static int mt_destroy(mtaux_t *mt);
300
301
static inline void packInt16(uint8_t *buffer, uint16_t value)
302
36.9k
{
303
36.9k
    buffer[0] = value;
304
36.9k
    buffer[1] = value >> 8;
305
36.9k
}
306
307
static inline int unpackInt16(const uint8_t *buffer)
308
1.46k
{
309
1.46k
    return buffer[0] | buffer[1] << 8;
310
1.46k
}
311
312
static inline void packInt32(uint8_t *buffer, uint32_t value)
313
73.9k
{
314
73.9k
    buffer[0] = value;
315
73.9k
    buffer[1] = value >> 8;
316
73.9k
    buffer[2] = value >> 16;
317
73.9k
    buffer[3] = value >> 24;
318
73.9k
}
319
320
static void razf_info(hFILE *hfp, const char *filename)
321
24
{
322
24
    uint64_t usize, csize;
323
24
    off_t sizes_pos;
324
325
24
    if (filename == NULL || strcmp(filename, "-") == 0) filename = "FILE";
326
327
    // RAZF files end with USIZE,CSIZE stored as big-endian uint64_t
328
24
    if ((sizes_pos = hseek(hfp, -16, SEEK_END)) < 0) goto no_sizes;
329
24
    if (hread(hfp, &usize, 8) != 8 || hread(hfp, &csize, 8) != 8) goto no_sizes;
330
24
    if (!ed_is_big()) ed_swap_8p(&usize), ed_swap_8p(&csize);
331
24
    if (csize >= sizes_pos) goto no_sizes; // Very basic validity check
332
333
0
    hts_log_error(
334
0
"To decompress this file, use the following commands:\n"
335
0
"    truncate -s %" PRIu64 " %s\n"
336
0
"    gunzip %s\n"
337
0
"The resulting uncompressed file should be %" PRIu64 " bytes in length.\n"
338
0
"If you do not have a truncate command, skip that step (though gunzip will\n"
339
0
"likely produce a \"trailing garbage ignored\" message, which can be ignored).",
340
0
                  csize, filename, filename, usize);
341
0
    return;
342
343
24
no_sizes:
344
24
    hts_log_error(
345
24
"To decompress this file, use the following command:\n"
346
24
"    gunzip %s\n"
347
24
"This will likely produce a \"trailing garbage ignored\" message, which can\n"
348
24
"usually be safely ignored.", filename);
349
24
}
350
351
static const char *bgzf_zerr(int errnum, z_stream *zs)
352
291
{
353
291
    static char buffer[32];
354
355
    /* Return zs->msg if available.
356
       zlib doesn't set this very reliably.  Looking at the source suggests
357
       that it may get set to a useful message for deflateInit2, inflateInit2
358
       and inflate when it returns Z_DATA_ERROR. For inflate with other
359
       return codes, deflate, deflateEnd and inflateEnd it doesn't appear
360
       to be useful.  For the likely non-useful cases, the caller should
361
       pass NULL into zs. */
362
363
291
    if (zs && zs->msg) return zs->msg;
364
365
    // gzerror OF((gzFile file, int *errnum)
366
3
    switch (errnum) {
367
3
    case Z_ERRNO:
368
3
        return strerror(errno);
369
0
    case Z_STREAM_ERROR:
370
0
        return "invalid parameter/compression level, or inconsistent stream state";
371
0
    case Z_DATA_ERROR:
372
0
        return "invalid or incomplete IO";
373
0
    case Z_MEM_ERROR:
374
0
        return "out of memory";
375
0
    case Z_BUF_ERROR:
376
0
        return "progress temporarily not possible, or in() / out() returned an error";
377
0
    case Z_VERSION_ERROR:
378
0
        return "zlib version mismatch";
379
0
    case Z_NEED_DICT:
380
0
        return "data was compressed using a dictionary";
381
0
    case Z_OK: // 0: maybe gzgets error Z_NULL
382
0
    default:
383
0
        snprintf(buffer, sizeof(buffer), "[%d] unknown", errnum);
384
0
        return buffer;  // FIXME: Not thread-safe.
385
3
    }
386
3
}
387
388
static BGZF *bgzf_read_init(hFILE *hfpr, const char *filename)
389
4.28k
{
390
4.28k
    BGZF *fp;
391
4.28k
    uint8_t magic[18];
392
4.28k
    ssize_t n = hpeek(hfpr, magic, 18);
393
4.28k
    if (n < 0) return NULL;
394
395
4.28k
    fp = (BGZF*)calloc(1, sizeof(BGZF));
396
4.28k
    if (fp == NULL) return NULL;
397
398
4.28k
    fp->is_write = 0;
399
4.28k
    fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
400
4.28k
    if (fp->uncompressed_block == NULL) { free(fp); return NULL; }
401
4.28k
    fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
402
4.28k
    fp->is_compressed = (n==18 && magic[0]==0x1f && magic[1]==0x8b);
403
4.28k
    fp->is_gzip = ( !fp->is_compressed || ((magic[3]&4) && memcmp(&magic[12], "BC\2\0",4)==0) ) ? 0 : 1;
404
4.28k
    if (fp->is_compressed && (magic[3]&4) && memcmp(&magic[12], "RAZF", 4)==0) {
405
24
        hts_log_error("Cannot decompress legacy RAZF format");
406
24
        razf_info(hfpr, filename);
407
24
        free(fp->uncompressed_block);
408
24
        free(fp);
409
24
        errno = EFTYPE;
410
24
        return NULL;
411
24
    }
412
4.26k
#ifdef BGZF_CACHE
413
4.26k
    if (!(fp->cache = malloc(sizeof(*fp->cache)))) {
414
0
        free(fp->uncompressed_block);
415
0
        free(fp);
416
0
        return NULL;
417
0
    }
418
4.26k
    if (!(fp->cache->h = kh_init(cache))) {
419
0
        free(fp->uncompressed_block);
420
0
        free(fp->cache);
421
0
        free(fp);
422
0
        return NULL;
423
0
    }
424
4.26k
    fp->cache->last_pos = 0;
425
4.26k
#endif
426
4.26k
    return fp;
427
4.26k
}
428
429
// get the compress level from the mode string: compress_level==-1 for the default level, -2 plain uncompressed
430
static int mode2level(const char *mode)
431
7.59k
{
432
7.59k
    int i, compress_level = -1;
433
22.7k
    for (i = 0; mode[i]; ++i)
434
15.1k
        if (mode[i] >= '0' && mode[i] <= '9') break;
435
7.59k
    if (mode[i]) compress_level = (int)mode[i] - '0';
436
7.59k
    if (strchr(mode, 'u')) compress_level = -2;
437
7.59k
    return compress_level;
438
7.59k
}
439
static BGZF *bgzf_write_init(const char *mode)
440
7.59k
{
441
7.59k
    BGZF *fp;
442
7.59k
    fp = (BGZF*)calloc(1, sizeof(BGZF));
443
7.59k
    if (fp == NULL) goto mem_fail;
444
7.59k
    fp->is_write = 1;
445
7.59k
    int compress_level = mode2level(mode);
446
7.59k
    if ( compress_level==-2 )
447
0
    {
448
0
        fp->is_compressed = 0;
449
0
        return fp;
450
0
    }
451
7.59k
    fp->is_compressed = 1;
452
453
7.59k
    fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
454
7.59k
    if (fp->uncompressed_block == NULL) goto mem_fail;
455
7.59k
    fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
456
457
7.59k
    fp->compress_level = compress_level < 0? Z_DEFAULT_COMPRESSION : compress_level; // Z_DEFAULT_COMPRESSION==-1
458
7.59k
    if (fp->compress_level > 9) fp->compress_level = Z_DEFAULT_COMPRESSION;
459
7.59k
    if ( strchr(mode,'g') )
460
0
    {
461
        // gzip output
462
0
        fp->is_gzip = 1;
463
0
        fp->gz_stream = (z_stream*)calloc(1,sizeof(z_stream));
464
0
        if (fp->gz_stream == NULL) goto mem_fail;
465
0
        fp->gz_stream->zalloc = NULL;
466
0
        fp->gz_stream->zfree  = NULL;
467
0
        fp->gz_stream->msg    = NULL;
468
469
0
        int ret = deflateInit2(fp->gz_stream, fp->compress_level, Z_DEFLATED, 15|16, 8, Z_DEFAULT_STRATEGY);
470
0
        if (ret!=Z_OK) {
471
0
            hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret, fp->gz_stream));
472
0
            goto fail;
473
0
        }
474
0
    }
475
7.59k
    return fp;
476
477
0
mem_fail:
478
0
    hts_log_error("%s", strerror(errno));
479
480
0
fail:
481
0
    if (fp != NULL) {
482
0
        free(fp->uncompressed_block);
483
0
        free(fp->gz_stream);
484
0
        free(fp);
485
0
    }
486
0
    return NULL;
487
0
}
488
489
BGZF *bgzf_open(const char *path, const char *mode)
490
1.45k
{
491
1.45k
    BGZF *fp = 0;
492
1.45k
    if (strchr(mode, 'r')) {
493
1.45k
        hFILE *fpr;
494
1.45k
        if ((fpr = hopen(path, mode)) == 0) return 0;
495
560
        fp = bgzf_read_init(fpr, path);
496
560
        if (fp == 0) { hclose_abruptly(fpr); return NULL; }
497
560
        fp->fp = fpr;
498
560
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
499
0
        hFILE *fpw;
500
0
        if ((fpw = hopen(path, mode)) == 0) return 0;
501
0
        fp = bgzf_write_init(mode);
502
0
        if (fp == NULL) return NULL;
503
0
        fp->fp = fpw;
504
0
    }
505
0
    else { errno = EINVAL; return 0; }
506
507
560
    fp->is_be = ed_is_big();
508
560
    return fp;
509
1.45k
}
510
511
BGZF *bgzf_dopen(int fd, const char *mode)
512
0
{
513
0
    BGZF *fp = 0;
514
0
    if (strchr(mode, 'r')) {
515
0
        hFILE *fpr;
516
0
        if ((fpr = hdopen(fd, mode)) == 0) return 0;
517
0
        fp = bgzf_read_init(fpr, NULL);
518
0
        if (fp == 0) { hclose_abruptly(fpr); return NULL; } // FIXME this closes fd
519
0
        fp->fp = fpr;
520
0
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
521
0
        hFILE *fpw;
522
0
        if ((fpw = hdopen(fd, mode)) == 0) return 0;
523
0
        fp = bgzf_write_init(mode);
524
0
        if (fp == NULL) return NULL;
525
0
        fp->fp = fpw;
526
0
    }
527
0
    else { errno = EINVAL; return 0; }
528
529
0
    fp->is_be = ed_is_big();
530
0
    return fp;
531
0
}
532
533
BGZF *bgzf_hopen(hFILE *hfp, const char *mode)
534
11.3k
{
535
11.3k
    BGZF *fp = NULL;
536
11.3k
    if (strchr(mode, 'r')) {
537
3.72k
        fp = bgzf_read_init(hfp, NULL);
538
3.72k
        if (fp == NULL) return NULL;
539
7.59k
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
540
7.59k
        fp = bgzf_write_init(mode);
541
7.59k
        if (fp == NULL) return NULL;
542
7.59k
    }
543
0
    else { errno = EINVAL; return 0; }
544
545
11.2k
    fp->fp = hfp;
546
11.2k
    fp->is_be = ed_is_big();
547
11.2k
    return fp;
548
11.3k
}
549
550
#ifdef HAVE_LIBDEFLATE
551
uint32_t hts_crc32(uint32_t crc, const void *buf, size_t len) {
552
    return libdeflate_crc32(crc, buf, len);
553
}
554
555
int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
556
{
557
    if (slen == 0) {
558
        // EOF block
559
        if (*dlen < 28) return -1;
560
        memcpy(_dst, "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", 28);
561
        *dlen = 28;
562
        return 0;
563
    }
564
565
    uint8_t *dst = (uint8_t*)_dst;
566
567
    if (level == 0) {
568
        // Uncompressed data
569
        if (*dlen < slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH) return -1;
570
        dst[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
571
        u16_to_le(slen,  &dst[BLOCK_HEADER_LENGTH+1]); // length
572
        u16_to_le(~slen, &dst[BLOCK_HEADER_LENGTH+3]); // ones-complement length
573
        memcpy(dst + BLOCK_HEADER_LENGTH+5, src, slen);
574
        *dlen = slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
575
576
    } else {
577
        level = level > 0 ? level : 6; // libdeflate doesn't honour -1 as default
578
        // NB levels go up to 12 here.
579
        int lvl_map[] = {0,1,2,3,5,6,7,8,10,12};
580
        level = lvl_map[level>9 ?9 :level];
581
        struct libdeflate_compressor *z = libdeflate_alloc_compressor(level);
582
        if (!z) return -1;
583
584
        // Raw deflate
585
        size_t clen =
586
            libdeflate_deflate_compress(z, src, slen,
587
                                        dst + BLOCK_HEADER_LENGTH,
588
                                        *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH);
589
590
        if (clen <= 0) {
591
            hts_log_error("Call to libdeflate_deflate_compress failed");
592
            libdeflate_free_compressor(z);
593
            return -1;
594
        }
595
596
        *dlen = clen + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
597
598
        libdeflate_free_compressor(z);
599
    }
600
601
    // write the header
602
    memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
603
    packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
604
605
    // write the footer
606
    uint32_t crc = libdeflate_crc32(0, src, slen);
607
    packInt32((uint8_t*)&dst[*dlen - 8], crc);
608
    packInt32((uint8_t*)&dst[*dlen - 4], slen);
609
    return 0;
610
}
611
612
#else
613
614
0
uint32_t hts_crc32(uint32_t crc, const void *buf, size_t len) {
615
0
    return crc32(crc, buf, len);
616
0
}
617
618
int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
619
36.9k
{
620
36.9k
    uint32_t crc;
621
36.9k
    z_stream zs;
622
36.9k
    uint8_t *dst = (uint8_t*)_dst;
623
624
36.9k
    if (level == 0) {
625
0
    uncomp:
626
        // Uncompressed data
627
0
        if (*dlen < slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH) return -1;
628
0
        dst[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
629
0
        u16_to_le(slen,  &dst[BLOCK_HEADER_LENGTH+1]); // length
630
0
        u16_to_le(~slen, &dst[BLOCK_HEADER_LENGTH+3]); // ones-complement length
631
0
        memcpy(dst + BLOCK_HEADER_LENGTH+5, src, slen);
632
0
        *dlen = slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
633
36.9k
    } else {
634
        // compress the body
635
36.9k
        zs.zalloc = NULL; zs.zfree = NULL;
636
36.9k
        zs.msg = NULL;
637
36.9k
        zs.next_in  = (Bytef*)src;
638
36.9k
        zs.avail_in = slen;
639
36.9k
        zs.next_out = dst + BLOCK_HEADER_LENGTH;
640
36.9k
        zs.avail_out = *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
641
36.9k
        int ret = deflateInit2(&zs, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); // -15 to disable zlib header/footer
642
36.9k
        if (ret!=Z_OK) {
643
0
            hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret, &zs));
644
0
            return -1;
645
0
        }
646
36.9k
        if ((ret = deflate(&zs, Z_FINISH)) != Z_STREAM_END) {
647
0
            if (ret == Z_OK && zs.avail_out == 0) {
648
0
                deflateEnd(&zs);
649
0
                goto uncomp;
650
0
            } else {
651
0
                hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
652
0
            }
653
0
            return -1;
654
0
        }
655
        // If we used up the entire output buffer, then we either ran out of
656
        // room or we *just* fitted, but either way we may as well store
657
        // uncompressed for faster decode.
658
36.9k
        if (zs.avail_out == 0) {
659
0
            deflateEnd(&zs);
660
0
            goto uncomp;
661
0
        }
662
36.9k
        if ((ret = deflateEnd(&zs)) != Z_OK) {
663
0
            hts_log_error("Call to deflateEnd failed: %s", bgzf_zerr(ret, NULL));
664
0
            return -1;
665
0
        }
666
36.9k
        *dlen = zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
667
36.9k
    }
668
669
    // write the header
670
36.9k
    memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
671
36.9k
    packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
672
    // write the footer
673
36.9k
    crc = crc32(crc32(0L, NULL, 0L), (Bytef*)src, slen);
674
36.9k
    packInt32((uint8_t*)&dst[*dlen - 8], crc);
675
36.9k
    packInt32((uint8_t*)&dst[*dlen - 4], slen);
676
36.9k
    return 0;
677
36.9k
}
678
#endif // HAVE_LIBDEFLATE
679
680
static int bgzf_gzip_compress(BGZF *fp, void *_dst, size_t *dlen, const void *src, size_t slen, int level)
681
0
{
682
0
    uint8_t *dst = (uint8_t*)_dst;
683
0
    z_stream *zs = fp->gz_stream;
684
0
    int flush = slen ? Z_PARTIAL_FLUSH : Z_FINISH;
685
0
    zs->next_in   = (Bytef*)src;
686
0
    zs->avail_in  = slen;
687
0
    zs->next_out  = dst;
688
0
    zs->avail_out = *dlen;
689
0
    int ret = deflate(zs, flush);
690
0
    if (ret == Z_STREAM_ERROR) {
691
0
        hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret, NULL));
692
0
        return -1;
693
0
    }
694
0
    if (zs->avail_in != 0) {
695
0
        hts_log_error("Deflate block too large for output buffer");
696
0
        return -1;
697
0
    }
698
0
    *dlen = *dlen - zs->avail_out;
699
0
    return 0;
700
0
}
701
702
// Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
703
static int deflate_block(BGZF *fp, int block_length)
704
36.9k
{
705
36.9k
    size_t comp_size = BGZF_MAX_BLOCK_SIZE;
706
36.9k
    int ret;
707
36.9k
    if ( !fp->is_gzip )
708
36.9k
        ret = bgzf_compress(fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
709
0
    else
710
0
        ret = bgzf_gzip_compress(fp, fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
711
712
36.9k
    if ( ret != 0 )
713
0
    {
714
0
        hts_log_debug("Compression error %d", ret);
715
0
        fp->errcode |= BGZF_ERR_ZLIB;
716
0
        return -1;
717
0
    }
718
36.9k
    fp->block_offset = 0;
719
36.9k
    return comp_size;
720
36.9k
}
721
722
#ifdef HAVE_LIBDEFLATE
723
724
static int bgzf_uncompress(uint8_t *dst, size_t *dlen,
725
                           const uint8_t *src, size_t slen,
726
                           uint32_t expected_crc) {
727
    struct libdeflate_decompressor *z = libdeflate_alloc_decompressor();
728
    if (!z) {
729
        hts_log_error("Call to libdeflate_alloc_decompressor failed");
730
        return -1;
731
    }
732
733
    int ret = libdeflate_deflate_decompress(z, src, slen, dst, *dlen, dlen);
734
    libdeflate_free_decompressor(z);
735
736
    if (ret != LIBDEFLATE_SUCCESS) {
737
        hts_log_error("Inflate operation failed: %d", ret);
738
        return -1;
739
    }
740
741
    uint32_t crc = libdeflate_crc32(0, (unsigned char *)dst, *dlen);
742
#ifdef FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION
743
    // Pretend the CRC was OK so the fuzzer doesn't have to get it right
744
    crc = expected_crc;
745
#endif
746
    if (crc != expected_crc) {
747
        hts_log_error("CRC32 checksum mismatch");
748
        return -2;
749
    }
750
751
    return 0;
752
}
753
754
#else
755
756
static int bgzf_uncompress(uint8_t *dst, size_t *dlen,
757
                           const uint8_t *src, size_t slen,
758
36
                           uint32_t expected_crc) {
759
36
    z_stream zs = {
760
36
        .zalloc = NULL,
761
36
        .zfree = NULL,
762
36
        .msg = NULL,
763
36
        .next_in = (Bytef*)src,
764
36
        .avail_in = slen,
765
36
        .next_out = (Bytef*)dst,
766
36
        .avail_out = *dlen
767
36
    };
768
769
36
    int ret = inflateInit2(&zs, -15);
770
36
    if (ret != Z_OK) {
771
0
        hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret, &zs));
772
0
        return -1;
773
0
    }
774
36
    if ((ret = inflate(&zs, Z_FINISH)) != Z_STREAM_END) {
775
3
        hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
776
3
        if ((ret = inflateEnd(&zs)) != Z_OK) {
777
0
            hts_log_warning("Call to inflateEnd failed: %s", bgzf_zerr(ret, NULL));
778
0
        }
779
3
        return -1;
780
3
    }
781
33
    if ((ret = inflateEnd(&zs)) != Z_OK) {
782
0
        hts_log_error("Call to inflateEnd failed: %s", bgzf_zerr(ret, NULL));
783
0
        return -1;
784
0
    }
785
33
    *dlen = *dlen - zs.avail_out;
786
787
33
    uint32_t crc = crc32(crc32(0L, NULL, 0L), (unsigned char *)dst, *dlen);
788
33
#ifdef FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION
789
    // Pretend the CRC was OK so the fuzzer doesn't have to get it right
790
33
    crc = expected_crc;
791
33
#endif
792
33
    if (crc != expected_crc) {
793
0
        hts_log_error("CRC32 checksum mismatch");
794
0
        return -2;
795
0
    }
796
797
33
    return 0;
798
33
}
799
#endif // HAVE_LIBDEFLATE
800
801
// Inflate the block in fp->compressed_block into fp->uncompressed_block
802
static int inflate_block(BGZF* fp, int block_length)
803
36
{
804
36
    size_t dlen = BGZF_MAX_BLOCK_SIZE;
805
36
    uint32_t crc = le_to_u32((uint8_t *)fp->compressed_block + block_length-8);
806
36
    int ret = bgzf_uncompress(fp->uncompressed_block, &dlen,
807
36
                              (Bytef*)fp->compressed_block + 18,
808
36
                              block_length - 18, crc);
809
36
    if (ret < 0) {
810
3
        if (ret == -2)
811
0
            fp->errcode |= BGZF_ERR_CRC;
812
3
        else
813
3
            fp->errcode |= BGZF_ERR_ZLIB;
814
3
        return -1;
815
3
    }
816
817
33
    return dlen;
818
36
}
819
820
// Decompress the next part of a non-blocked GZIP file.
821
// Return the number of uncompressed bytes read, 0 on EOF, or a negative number on error.
822
// Will fill the output buffer unless the end of the GZIP file is reached.
823
static int inflate_gzip_block(BGZF *fp)
824
80.0k
{
825
    // we will set this to true when we detect EOF, so we don't bang against the EOF more than once per call
826
80.0k
    int input_eof = 0;
827
828
    // write to the part of the output buffer after block_offset
829
80.0k
    fp->gz_stream->next_out = (Bytef*)fp->uncompressed_block + fp->block_offset;
830
80.0k
    fp->gz_stream->avail_out = BGZF_MAX_BLOCK_SIZE - fp->block_offset;
831
832
160k
    while ( fp->gz_stream->avail_out != 0 ) {
833
        // until we fill the output buffer (or hit EOF)
834
835
81.8k
        if ( !input_eof && fp->gz_stream->avail_in == 0 ) {
836
            // we are out of input data in the buffer. Get more.
837
1.79k
            fp->gz_stream->next_in = fp->compressed_block;
838
1.79k
            int ret = hread(fp->fp, fp->compressed_block, BGZF_BLOCK_SIZE);
839
1.79k
            if ( ret < 0 ) {
840
                // hread had an error. Pass it on.
841
0
                return ret;
842
0
            }
843
1.79k
            fp->gz_stream->avail_in = ret;
844
1.79k
            if ( fp->gz_stream->avail_in < BGZF_BLOCK_SIZE ) {
845
                // we have reached EOF but the decompressor hasn't necessarily
846
1.09k
                input_eof = 1;
847
1.09k
            }
848
1.79k
        }
849
850
81.8k
        fp->gz_stream->msg = NULL;
851
        // decompress as much data as we can
852
81.8k
        int ret = inflate(fp->gz_stream, Z_SYNC_FLUSH);
853
854
81.8k
        if ( (ret < 0 && ret != Z_BUF_ERROR) || ret == Z_NEED_DICT ) {
855
            // an error occurred, other than running out of space
856
285
            hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? fp->gz_stream : NULL));
857
285
            fp->errcode |= BGZF_ERR_ZLIB;
858
285
            return -1;
859
81.5k
        } else if ( ret == Z_STREAM_END ) {
860
            // we finished a GZIP member
861
862
            // scratch for peeking to see if the file is over
863
6
            char c;
864
6
            if (fp->gz_stream->avail_in > 0 || hpeek(fp->fp, &c, 1) == 1) {
865
                // there is more data; try and read another GZIP member in the remaining data
866
6
                int reset_ret = inflateReset(fp->gz_stream);
867
6
                if (reset_ret != Z_OK) {
868
0
                    hts_log_error("Call to inflateReset failed: %s", bgzf_zerr(reset_ret, NULL));
869
0
                    fp->errcode |= BGZF_ERR_ZLIB;
870
0
                    return -1;
871
0
                }
872
6
            } else {
873
                // we consumed all the input data and hit Z_STREAM_END
874
                // so stop looping, even if we never fill the output buffer
875
0
                break;
876
0
            }
877
81.5k
        } else if ( ret == Z_BUF_ERROR && input_eof && fp->gz_stream->avail_out > 0 ) {
878
            // the gzip file has ended prematurely
879
630
            hts_log_error("Gzip file truncated");
880
630
            fp->errcode |= BGZF_ERR_IO;
881
630
            return -1;
882
630
        }
883
81.8k
    }
884
885
    // when we get here, the buffer is full or there is an EOF after a complete gzip member
886
79.1k
    return BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
887
80.0k
}
888
889
// Returns: 0 on success (BGZF header); -1 on non-BGZF GZIP header; -2 on error
890
static int check_header(const uint8_t *header)
891
1.60k
{
892
1.60k
    if ( header[0] != 31 || header[1] != 139 || header[2] != 8 ) return -2;
893
1.60k
    return ((header[3] & 4) != 0
894
957
            && unpackInt16((uint8_t*)&header[10]) == 6
895
951
            && header[12] == 'B' && header[13] == 'C'
896
1.55k
            && unpackInt16((uint8_t*)&header[14]) == 2) ? 0 : -1;
897
1.60k
}
898
899
#ifdef BGZF_CACHE
900
static void free_cache(BGZF *fp)
901
11.8k
{
902
11.8k
    khint_t k;
903
11.8k
    if (fp->is_write) return;
904
4.26k
    khash_t(cache) *h = fp->cache->h;
905
4.26k
    for (k = kh_begin(h); k < kh_end(h); ++k)
906
0
        if (kh_exist(h, k)) free(kh_val(h, k).block);
907
4.26k
    kh_destroy(cache, h);
908
4.26k
    free(fp->cache);
909
4.26k
}
910
911
static int load_block_from_cache(BGZF *fp, int64_t block_address)
912
0
{
913
0
    khint_t k;
914
0
    cache_t *p;
915
916
0
    khash_t(cache) *h = fp->cache->h;
917
0
    k = kh_get(cache, h, block_address);
918
0
    if (k == kh_end(h)) return 0;
919
0
    p = &kh_val(h, k);
920
0
    if (fp->block_length != 0) fp->block_offset = 0;
921
0
    fp->block_address = block_address;
922
0
    fp->block_length = p->size;
923
0
    memcpy(fp->uncompressed_block, p->block, p->size);
924
0
    if ( hseek(fp->fp, p->end_offset, SEEK_SET) < 0 )
925
0
    {
926
        // todo: move the error up
927
0
        hts_log_error("Could not hseek to %" PRId64, p->end_offset);
928
0
        exit(1);
929
0
    }
930
0
    return p->size;
931
0
}
932
933
static void cache_block(BGZF *fp, int size)
934
33
{
935
33
    int ret;
936
33
    khint_t k, k_orig;
937
33
    uint8_t *block = NULL;
938
33
    cache_t *p;
939
    //fprintf(stderr, "Cache block at %llx\n", (int)fp->block_address);
940
33
    khash_t(cache) *h = fp->cache->h;
941
33
    if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return;
942
0
    if (fp->block_length < 0 || fp->block_length > BGZF_MAX_BLOCK_SIZE) return;
943
0
    if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > (uint32_t)fp->cache_size) {
944
        /* Remove uniformly from any position in the hash by a simple
945
         * round-robin approach.  An alternative strategy would be to
946
         * remove the least recently accessed block, but the round-robin
947
         * removal is simpler and is not expected to have a big impact
948
         * on performance */
949
0
        if (fp->cache->last_pos >= kh_end(h)) fp->cache->last_pos = kh_begin(h);
950
0
        k_orig = k = fp->cache->last_pos;
951
0
        if (++k >= kh_end(h)) k = kh_begin(h);
952
0
        while (k != k_orig) {
953
0
            if (kh_exist(h, k))
954
0
                break;
955
0
            if (++k == kh_end(h))
956
0
                k = kh_begin(h);
957
0
        }
958
0
        fp->cache->last_pos = k;
959
960
0
        if (k != k_orig) {
961
0
            block = kh_val(h, k).block;
962
0
            kh_del(cache, h, k);
963
0
        }
964
0
    } else {
965
0
        block = (uint8_t*)malloc(BGZF_MAX_BLOCK_SIZE);
966
0
    }
967
0
    if (!block) return;
968
0
    k = kh_put(cache, h, fp->block_address, &ret);
969
0
    if (ret <= 0) { // kh_put failed, or in there already (shouldn't happen)
970
0
        free(block);
971
0
        return;
972
0
    }
973
0
    p = &kh_val(h, k);
974
0
    p->size = fp->block_length;
975
0
    p->end_offset = fp->block_address + size;
976
0
    p->block = block;
977
0
    memcpy(p->block, fp->uncompressed_block, p->size);
978
0
}
979
#else
980
static void free_cache(BGZF *fp) {}
981
static int load_block_from_cache(BGZF *fp, int64_t block_address) {return 0;}
982
static void cache_block(BGZF *fp, int size) {}
983
#endif
984
985
/*
986
 * Absolute htell in this compressed file.
987
 *
988
 * Do not confuse with the external bgzf_tell macro which returns the virtual
989
 * offset.
990
 */
991
161k
static off_t bgzf_htell(BGZF *fp) {
992
161k
    if (fp->mt) {
993
0
        pthread_mutex_lock(&fp->mt->job_pool_m);
994
0
        off_t pos = fp->block_address + fp->block_clength;
995
0
        pthread_mutex_unlock(&fp->mt->job_pool_m);
996
0
        return pos;
997
161k
    } else {
998
161k
        return htell(fp->fp);
999
161k
    }
1000
161k
}
1001
1002
int bgzf_read_block(BGZF *fp)
1003
82.6k
{
1004
82.6k
    hts_tpool_result *r;
1005
1006
82.6k
    if (fp->errcode) return -1;
1007
1008
82.6k
    if (fp->mt) {
1009
0
    again:
1010
0
        if (fp->mt->hit_eof) {
1011
            // Further reading at EOF will always return 0
1012
0
            fp->block_length = 0;
1013
0
            return 0;
1014
0
        }
1015
0
        r = hts_tpool_next_result_wait(fp->mt->out_queue);
1016
0
        bgzf_job *j = r ? (bgzf_job *)hts_tpool_result_data(r) : NULL;
1017
1018
0
        if (!j || j->errcode == BGZF_ERR_MT) {
1019
0
            if (!fp->mt->free_block) {
1020
0
                fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
1021
0
                if (fp->uncompressed_block == NULL) return -1;
1022
0
                fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
1023
0
            } // else it's already allocated with malloc, maybe even in-use.
1024
0
            if (mt_destroy(fp->mt) < 0) {
1025
0
                fp->errcode = BGZF_ERR_IO;
1026
0
            }
1027
0
            fp->mt = NULL;
1028
0
            hts_tpool_delete_result(r, 0);
1029
0
            if (fp->errcode) {
1030
0
                return -1;
1031
0
            }
1032
0
            goto single_threaded;
1033
0
        }
1034
1035
0
        if (j->errcode) {
1036
0
            fp->errcode = j->errcode;
1037
0
            hts_log_error("BGZF decode jobs returned error %d "
1038
0
                          "for block offset %"PRId64,
1039
0
                          j->errcode, j->block_address);
1040
0
            hts_tpool_delete_result(r, 0);
1041
0
            return -1;
1042
0
        }
1043
1044
0
        if (j->hit_eof) {
1045
0
            if (!fp->last_block_eof && !fp->no_eof_block) {
1046
0
                fp->no_eof_block = 1;
1047
0
                hts_log_warning("EOF marker is absent. The input may be truncated");
1048
0
            }
1049
0
            fp->mt->hit_eof = 1;
1050
0
        }
1051
1052
        // Zero length blocks in the middle of a file are (wrongly)
1053
        // considered as EOF by many callers.  We work around this by
1054
        // trying again to see if we hit a genuine EOF.
1055
0
        if (!j->hit_eof && j->uncomp_len == 0) {
1056
0
            fp->last_block_eof = 1;
1057
0
            hts_tpool_delete_result(r, 0);
1058
0
            goto again;
1059
0
        }
1060
1061
        // block_length=0 and block_offset set by bgzf_seek.
1062
0
        if (fp->block_length != 0) fp->block_offset = 0;
1063
0
        if (!j->hit_eof) fp->block_address = j->block_address;
1064
0
        fp->block_clength = j->comp_len;
1065
0
        fp->block_length = j->uncomp_len;
1066
        // bgzf_read() can change fp->block_length
1067
0
        fp->last_block_eof = (fp->block_length == 0);
1068
1069
0
        if ( j->uncomp_len && j->fp->idx_build_otf )
1070
0
        {
1071
0
            bgzf_index_add_block(j->fp);
1072
0
            j->fp->idx->ublock_addr += j->uncomp_len;
1073
0
        }
1074
1075
        // Steal the data block as it's quicker than a memcpy.
1076
        // We just need to make sure we delay the pool free.
1077
0
        if (fp->mt->curr_job) {
1078
0
            pthread_mutex_lock(&fp->mt->job_pool_m);
1079
0
            pool_free(fp->mt->job_pool, fp->mt->curr_job);
1080
0
            pthread_mutex_unlock(&fp->mt->job_pool_m);
1081
0
        }
1082
0
        fp->uncompressed_block = j->uncomp_data;
1083
0
        fp->mt->curr_job = j;
1084
0
        if (fp->mt->free_block) {
1085
0
            free(fp->mt->free_block); // clear up last non-mt block
1086
0
            fp->mt->free_block = NULL;
1087
0
        }
1088
1089
0
        hts_tpool_delete_result(r, 0);
1090
0
        return 0;
1091
0
    }
1092
1093
82.6k
    uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
1094
82.6k
    int count, size, block_length, remaining;
1095
1096
82.6k
 single_threaded:
1097
82.6k
    size = 0;
1098
1099
82.6k
    int64_t block_address;
1100
82.6k
    block_address = bgzf_htell(fp);
1101
1102
    // Reading an uncompressed file
1103
82.6k
    if ( !fp->is_compressed )
1104
2.46k
    {
1105
2.46k
        count = hread(fp->fp, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
1106
2.46k
        if (count < 0)  // Error
1107
0
        {
1108
0
            hts_log_error("Failed to read uncompressed data "
1109
0
                          "at offset %"PRId64"%s%s",
1110
0
                          block_address, errno ? ": " : "", strerror(errno));
1111
0
            fp->errcode |= BGZF_ERR_IO;
1112
0
            return -1;
1113
0
        }
1114
2.46k
        else if (count == 0)  // EOF
1115
809
        {
1116
809
            fp->block_length = 0;
1117
809
            return 0;
1118
809
        }
1119
1.65k
        if (fp->block_length != 0) fp->block_offset = 0;
1120
1.65k
        fp->block_address = block_address;
1121
1.65k
        fp->block_length = count;
1122
1.65k
        return 0;
1123
2.46k
    }
1124
1125
    // Reading compressed file
1126
80.1k
    if ( fp->is_gzip && fp->gz_stream ) // is this is an initialized gzip stream?
1127
78.4k
    {
1128
78.4k
        count = inflate_gzip_block(fp);
1129
78.4k
        if ( count<0 )
1130
901
        {
1131
901
            hts_log_error("Reading GZIP stream failed at offset %"PRId64,
1132
901
                          block_address);
1133
901
            fp->errcode |= BGZF_ERR_ZLIB;
1134
901
            return -1;
1135
901
        }
1136
77.5k
        fp->block_length = count;
1137
77.5k
        fp->block_address = block_address;
1138
77.5k
        return 0;
1139
78.4k
    }
1140
1.67k
    if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
1141
1142
    // loop to skip empty bgzf blocks
1143
1.67k
    while (1)
1144
1.67k
    {
1145
1.67k
        count = hread(fp->fp, header, sizeof(header));
1146
1.67k
        if (count == 0) { // no data read
1147
69
            if (!fp->last_block_eof && !fp->no_eof_block && !fp->is_gzip) {
1148
30
                fp->no_eof_block = 1;
1149
30
                hts_log_warning("EOF marker is absent. The input may be truncated");
1150
30
            }
1151
69
            fp->block_length = 0;
1152
69
            return 0;
1153
69
        }
1154
1.60k
        int ret = 0;
1155
1.60k
        if ( count != sizeof(header) || (ret=check_header(header))==-2 )
1156
3
        {
1157
3
            fp->errcode |= BGZF_ERR_HEADER;
1158
3
            hts_log_error("%s BGZF header at offset %"PRId64,
1159
3
                          ret ? "Invalid" : "Failed to read",
1160
3
                          block_address);
1161
3
            return -1;
1162
3
        }
1163
1.60k
        if ( ret==-1 )
1164
1.55k
        {
1165
            // GZIP, not BGZF
1166
1.55k
            uint8_t *cblock = (uint8_t*)fp->compressed_block;
1167
1.55k
            memcpy(cblock, header, sizeof(header));
1168
1.55k
            count = hread(fp->fp, cblock+sizeof(header), BGZF_BLOCK_SIZE - sizeof(header)) + sizeof(header);
1169
1170
1.55k
            fp->is_gzip = 1;
1171
1.55k
            fp->gz_stream = (z_stream*) calloc(1,sizeof(z_stream));
1172
            // Set up zlib, using a window size of 15, and its built-in GZIP header processing (+16).
1173
1.55k
            int ret = inflateInit2(fp->gz_stream, 15 + 16);
1174
1.55k
            if (ret != Z_OK)
1175
0
            {
1176
0
                hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret, fp->gz_stream));
1177
0
                fp->errcode |= BGZF_ERR_ZLIB;
1178
0
                return -1;
1179
0
            }
1180
1.55k
            fp->gz_stream->avail_in = count;
1181
1.55k
            fp->gz_stream->next_in  = cblock;
1182
1.55k
            count = inflate_gzip_block(fp);
1183
1.55k
            if ( count<0 )
1184
14
            {
1185
14
                hts_log_error("Reading GZIP stream failed at offset %"PRId64,
1186
14
                              block_address);
1187
14
                fp->errcode |= BGZF_ERR_ZLIB;
1188
14
                return -1;
1189
14
            }
1190
1.54k
            fp->block_length = count;
1191
1.54k
            fp->block_address = block_address;
1192
1.54k
            if ( fp->idx_build_otf ) return -1; // cannot build index for gzip
1193
1.54k
            return 0;
1194
1.54k
        }
1195
48
        size = count;
1196
48
        block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1197
48
        if (block_length < BLOCK_HEADER_LENGTH)
1198
0
        {
1199
0
            hts_log_error("Invalid BGZF block length at offset %"PRId64,
1200
0
                          block_address);
1201
0
            fp->errcode |= BGZF_ERR_HEADER;
1202
0
            return -1;
1203
0
        }
1204
48
        compressed_block = (uint8_t*)fp->compressed_block;
1205
48
        memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1206
48
        remaining = block_length - BLOCK_HEADER_LENGTH;
1207
48
        count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1208
48
        if (count != remaining) {
1209
12
            hts_log_error("Failed to read BGZF block data at offset %"PRId64
1210
12
                          " expected %d bytes; hread returned %d",
1211
12
                          block_address, remaining, count);
1212
12
            fp->errcode |= BGZF_ERR_IO;
1213
12
            return -1;
1214
12
        }
1215
36
        size += count;
1216
36
        if ((count = inflate_block(fp, block_length)) < 0) {
1217
3
            hts_log_debug("Inflate block operation failed for "
1218
3
                          "block at offset %"PRId64": %s",
1219
3
                          block_address, bgzf_zerr(count, NULL));
1220
3
            fp->errcode |= BGZF_ERR_ZLIB;
1221
3
            return -1;
1222
3
        }
1223
33
        fp->last_block_eof = (count == 0);
1224
33
        if ( count ) break;     // otherwise an empty bgzf block
1225
0
        block_address = bgzf_htell(fp); // update for new block start
1226
0
    }
1227
33
    if (fp->block_length != 0) fp->block_offset = 0; // Do not reset offset if this read follows a seek.
1228
33
    fp->block_address = block_address;
1229
33
    fp->block_length = count;
1230
33
    if ( fp->idx_build_otf )
1231
0
    {
1232
0
        bgzf_index_add_block(fp);
1233
0
        fp->idx->ublock_addr += count;
1234
0
    }
1235
33
    cache_block(fp, size);
1236
33
    return 0;
1237
1.67k
}
1238
1239
ssize_t bgzf_read(BGZF *fp, void *data, size_t length)
1240
7.64k
{
1241
7.64k
    ssize_t bytes_read = 0;
1242
7.64k
    uint8_t *output = (uint8_t*)data;
1243
7.64k
    if (length <= 0) return 0;
1244
7.64k
    assert(fp->is_write == 0);
1245
13.9k
    while (bytes_read < length) {
1246
7.25k
        int copy_length, available = fp->block_length - fp->block_offset;
1247
7.25k
        uint8_t *buffer;
1248
7.25k
        if (available <= 0) {
1249
1.43k
            int ret = bgzf_read_block(fp);
1250
1.43k
            if (ret != 0) {
1251
24
                hts_log_error("Read block operation failed with error %d after %zd of %zu bytes", fp->errcode, bytes_read, length);
1252
24
                fp->errcode |= BGZF_ERR_ZLIB;
1253
24
                return -1;
1254
24
            }
1255
1.40k
            available = fp->block_length - fp->block_offset;
1256
1.40k
            if (available == 0) {
1257
227
                if (fp->block_length == 0)
1258
227
                    break; // EOF
1259
1260
                // Offset was at end of block (see commit e9863a0)
1261
0
                fp->block_address = bgzf_htell(fp);
1262
0
                fp->block_offset = fp->block_length = 0;
1263
0
                continue;
1264
1.17k
            } else if (available < 0) {
1265
                // Block offset was set to an invalid coordinate
1266
0
                hts_log_error("BGZF block offset %d set beyond block size %d",
1267
0
                              fp->block_offset, fp->block_length);
1268
0
                fp->errcode |= BGZF_ERR_MISUSE;
1269
0
                return -1;
1270
0
            }
1271
1.40k
        }
1272
6.99k
        copy_length = length - bytes_read < available? length - bytes_read : available;
1273
6.99k
        buffer = (uint8_t*)fp->uncompressed_block;
1274
6.99k
        memcpy(output, buffer + fp->block_offset, copy_length);
1275
6.99k
        fp->block_offset += copy_length;
1276
6.99k
        output += copy_length;
1277
6.99k
        bytes_read += copy_length;
1278
1279
        // For raw gzip streams this avoids short reads.
1280
6.99k
        if (fp->block_offset == fp->block_length) {
1281
400
            fp->block_address = bgzf_htell(fp);
1282
400
            fp->block_offset = fp->block_length = 0;
1283
400
        }
1284
6.99k
    }
1285
1286
6.93k
    fp->uncompressed_address += bytes_read;
1287
1288
6.93k
    return bytes_read;
1289
6.95k
}
1290
1291
// -1 for EOF, -2 for error, 0-255 for byte.
1292
347k
int bgzf_peek(BGZF *fp) {
1293
347k
    int available = fp->block_length - fp->block_offset;
1294
347k
    if (available <= 0) {
1295
12
        if (bgzf_read_block(fp) < 0) {
1296
3
            hts_log_error("Read block operation failed with error %d", fp->errcode);
1297
3
            fp->errcode = BGZF_ERR_ZLIB;
1298
3
            return -2;
1299
3
        }
1300
12
    }
1301
347k
    available = fp->block_length - fp->block_offset;
1302
347k
    if (available)
1303
347k
        return ((unsigned char *)fp->uncompressed_block)[fp->block_offset];
1304
1305
0
    return -1;
1306
347k
}
1307
1308
ssize_t bgzf_raw_read(BGZF *fp, void *data, size_t length)
1309
0
{
1310
0
    ssize_t ret = hread(fp->fp, data, length);
1311
0
    if (ret < 0) fp->errcode |= BGZF_ERR_IO;
1312
0
    return ret;
1313
0
}
1314
1315
#ifdef BGZF_MT
1316
1317
/* Function to clean up when jobs are discarded (e.g. during seek)
1318
 * This works for results too, as results are the same struct with
1319
 * decompressed data stored in it. */
1320
0
static void job_cleanup(void *arg) {
1321
0
    bgzf_job *j = (bgzf_job *)arg;
1322
0
    mtaux_t *mt = j->fp->mt;
1323
0
    pthread_mutex_lock(&mt->job_pool_m);
1324
0
    pool_free(mt->job_pool, j);
1325
0
    pthread_mutex_unlock(&mt->job_pool_m);
1326
0
}
1327
1328
0
static void *bgzf_encode_func(void *arg) {
1329
0
    bgzf_job *j = (bgzf_job *)arg;
1330
1331
0
    j->comp_len = BGZF_MAX_BLOCK_SIZE;
1332
0
    int ret = bgzf_compress(j->comp_data, &j->comp_len,
1333
0
                            j->uncomp_data, j->uncomp_len,
1334
0
                            j->fp->compress_level);
1335
0
    if (ret != 0)
1336
0
        j->errcode |= BGZF_ERR_ZLIB;
1337
1338
0
    return arg;
1339
0
}
1340
1341
// Optimisation for compression level 0 (uncompressed deflate blocks)
1342
// Avoids memcpy of the data from uncompressed to compressed buffer.
1343
0
static void *bgzf_encode_level0_func(void *arg) {
1344
0
    bgzf_job *j = (bgzf_job *)arg;
1345
0
    uint32_t crc;
1346
0
    j->comp_len = j->uncomp_len + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH + 5;
1347
1348
    // Data will have already been copied in to
1349
    // j->comp_data + BLOCK_HEADER_LENGTH + 5
1350
1351
    // Add preamble
1352
0
    memcpy(j->comp_data, g_magic, BLOCK_HEADER_LENGTH);
1353
0
    u16_to_le(j->comp_len-1, j->comp_data + 16);
1354
1355
    // Deflate uncompressed data header
1356
0
    j->comp_data[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
1357
0
    u16_to_le(j->uncomp_len, j->comp_data + BLOCK_HEADER_LENGTH + 1);
1358
0
    u16_to_le(~j->uncomp_len, j->comp_data + BLOCK_HEADER_LENGTH + 3);
1359
1360
    // Trailer (CRC, uncompressed length)
1361
0
    crc = hts_crc32(0, j->comp_data + BLOCK_HEADER_LENGTH + 5, j->uncomp_len);
1362
0
    u32_to_le(crc, j->comp_data +  j->comp_len - 8);
1363
0
    u32_to_le(j->uncomp_len, j->comp_data + j->comp_len - 4);
1364
1365
0
    return arg;
1366
0
}
1367
1368
// Our input block has already been decoded by bgzf_mt_read_block().
1369
// We need to split that into a fetch block (compressed) and make this
1370
// do the actual decompression step.
1371
0
static void *bgzf_decode_func(void *arg) {
1372
0
    bgzf_job *j = (bgzf_job *)arg;
1373
1374
0
    j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1375
0
    uint32_t crc = le_to_u32((uint8_t *)j->comp_data + j->comp_len-8);
1376
0
    int ret = bgzf_uncompress(j->uncomp_data, &j->uncomp_len,
1377
0
                              j->comp_data+18, j->comp_len-18, crc);
1378
0
    if (ret != 0)
1379
0
        j->errcode |= BGZF_ERR_ZLIB;
1380
1381
0
    return arg;
1382
0
}
1383
1384
/*
1385
 * Nul function so we can dispatch a job with the correct serial
1386
 * to mark failure or to indicate an empty read (EOF).
1387
 */
1388
0
static void *bgzf_nul_func(void *arg) { return arg; }
1389
1390
/*
1391
 * Takes compressed blocks off the results queue and calls hwrite to
1392
 * punt them to the output stream.
1393
 *
1394
 * Returns NULL when no more are left, or -1 on error
1395
 */
1396
0
static void *bgzf_mt_writer(void *vp) {
1397
0
    BGZF *fp = (BGZF *)vp;
1398
0
    mtaux_t *mt = fp->mt;
1399
0
    hts_tpool_result *r;
1400
1401
0
    if (fp->idx_build_otf) {
1402
0
        fp->idx->moffs = fp->idx->noffs = 1;
1403
0
        fp->idx->offs = (bgzidx1_t*) calloc(fp->idx->moffs, sizeof(bgzidx1_t));
1404
0
        if (!fp->idx->offs) goto err;
1405
0
    }
1406
1407
    // Iterates until result queue is shutdown, where it returns NULL.
1408
0
    while ((r = hts_tpool_next_result_wait(mt->out_queue))) {
1409
0
        bgzf_job *j = (bgzf_job *)hts_tpool_result_data(r);
1410
0
        assert(j);
1411
1412
0
        if (fp->idx_build_otf) {
1413
0
            fp->idx->noffs++;
1414
0
            if ( fp->idx->noffs > fp->idx->moffs )
1415
0
            {
1416
0
                fp->idx->moffs = fp->idx->noffs;
1417
0
                kroundup32(fp->idx->moffs);
1418
0
                fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
1419
0
                if ( !fp->idx->offs ) goto err;
1420
0
            }
1421
0
            fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->offs[ fp->idx->noffs-2 ].uaddr + j->uncomp_len;
1422
0
            fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->idx->offs[ fp->idx->noffs-2 ].caddr + j->comp_len;
1423
0
        }
1424
1425
        // Flush any cached hts_idx_push calls
1426
0
        if (bgzf_idx_flush(fp, j->uncomp_len, j->comp_len) < 0)
1427
0
            goto err;
1428
1429
0
        if (hwrite(fp->fp, j->comp_data, j->comp_len) != j->comp_len)
1430
0
            goto err;
1431
1432
        // Update our local block_address.  Cannot be fp->block_address due to no
1433
        // locking in bgzf_tell.
1434
0
        pthread_mutex_lock(&mt->idx_m);
1435
0
        mt->block_address += j->comp_len;
1436
0
        pthread_mutex_unlock(&mt->idx_m);
1437
1438
        /*
1439
         * Periodically call hflush (which calls fsync when on a file).
1440
         * This avoids the fsync being done at the bgzf_close stage,
1441
         * which can sometimes cause significant delays.  As this is in
1442
         * a separate thread, spreading the sync delays throughout the
1443
         * program execution seems better.
1444
         * Frequency of 1/512 has been chosen by experimentation
1445
         * across local XFS, NFS and Lustre tests.
1446
         */
1447
0
        if (++mt->flush_pending % 512 == 0)
1448
0
            if (hflush(fp->fp) != 0)
1449
0
                goto err;
1450
1451
1452
0
        hts_tpool_delete_result(r, 0);
1453
1454
        // Also updated by main thread
1455
0
        pthread_mutex_lock(&mt->job_pool_m);
1456
0
        pool_free(mt->job_pool, j);
1457
0
        mt->jobs_pending--;
1458
0
        pthread_mutex_unlock(&mt->job_pool_m);
1459
0
    }
1460
1461
0
    if (hflush(fp->fp) != 0)
1462
0
        goto err;
1463
1464
0
    hts_tpool_process_destroy(mt->out_queue);
1465
1466
0
    return NULL;
1467
1468
0
 err:
1469
0
    hts_tpool_process_destroy(mt->out_queue);
1470
0
    return (void *)-1;
1471
0
}
1472
1473
1474
/*
1475
 * Reads a compressed block of data using hread and dispatches it to
1476
 * the thread pool for decompression.  This is the analogue of the old
1477
 * non-threaded bgzf_read_block() function, but without modifying fp
1478
 * in any way (except for the read offset).  All output goes via the
1479
 * supplied bgzf_job struct.
1480
 *
1481
 * Returns NULL when no more are left, or -1 on error
1482
 */
1483
int bgzf_mt_read_block(BGZF *fp, bgzf_job *j)
1484
0
{
1485
0
    uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
1486
0
    int count, block_length, remaining;
1487
1488
    // NOTE: Guaranteed to be compressed as we block multi-threading in
1489
    // uncompressed mode.  However it may be gzip compression instead
1490
    // of bgzf.
1491
1492
    // Reading compressed file
1493
0
    int64_t block_address;
1494
0
    block_address = htell(fp->fp);
1495
1496
0
    j->block_address = block_address;  // in case we exit with j->errcode
1497
1498
0
    if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
1499
0
    count = hpeek(fp->fp, header, sizeof(header));
1500
0
    if (count == 0) // no data read
1501
0
        return -1;
1502
0
    int ret;
1503
0
    if ( count != sizeof(header) || (ret=check_header(header))==-2 )
1504
0
    {
1505
0
        j->errcode |= BGZF_ERR_HEADER;
1506
0
        return -1;
1507
0
    }
1508
0
    if (ret == -1) {
1509
0
        j->errcode |= BGZF_ERR_MT;
1510
0
        return -1;
1511
0
    }
1512
1513
0
    count = hread(fp->fp, header, sizeof(header));
1514
0
    if (count != sizeof(header)) // no data read
1515
0
        return -1;
1516
1517
0
    block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1518
0
    if (block_length < BLOCK_HEADER_LENGTH) {
1519
0
        j->errcode |= BGZF_ERR_HEADER;
1520
0
        return -1;
1521
0
    }
1522
0
    compressed_block = (uint8_t*)j->comp_data;
1523
0
    memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1524
0
    remaining = block_length - BLOCK_HEADER_LENGTH;
1525
0
    count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1526
0
    if (count != remaining) {
1527
0
        j->errcode |= BGZF_ERR_IO;
1528
0
        return -1;
1529
0
    }
1530
0
    j->comp_len = block_length;
1531
0
    j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1532
0
    j->block_address = block_address;
1533
0
    j->fp = fp;
1534
0
    j->errcode = 0;
1535
1536
0
    return 0;
1537
0
}
1538
1539
1540
static int bgzf_check_EOF_common(BGZF *fp)
1541
780
{
1542
780
    uint8_t buf[28];
1543
780
    off_t offset = htell(fp->fp);
1544
780
    if (hseek(fp->fp, -28, SEEK_END) < 0) {
1545
0
        if (errno == ESPIPE) { hclearerr(fp->fp); return 2; }
1546
#ifdef _WIN32
1547
        if (errno == EINVAL) { hclearerr(fp->fp); return 2; }
1548
#else
1549
        // Assume that EINVAL was due to the file being less than 28 bytes
1550
        // long, rather than being a random error return from an hfile backend.
1551
        // This should be reported as "no EOF block" rather than an error.
1552
0
        if (errno == EINVAL) { hclearerr(fp->fp); return 0; }
1553
0
#endif
1554
0
        return -1;
1555
0
    }
1556
780
    if ( hread(fp->fp, buf, 28) != 28 ) return -1;
1557
780
    if ( hseek(fp->fp, offset, SEEK_SET) < 0 ) return -1;
1558
780
    return (memcmp("\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", buf, 28) == 0)? 1 : 0;
1559
780
}
1560
1561
/*
1562
 * Checks EOF from the reader thread.
1563
 */
1564
0
static void bgzf_mt_eof(BGZF *fp) {
1565
0
    mtaux_t *mt = fp->mt;
1566
1567
0
    pthread_mutex_lock(&mt->job_pool_m);
1568
0
    mt->eof = bgzf_check_EOF_common(fp);
1569
0
    pthread_mutex_unlock(&mt->job_pool_m);
1570
0
    mt->command = HAS_EOF_DONE;
1571
0
    pthread_cond_signal(&mt->command_c);
1572
0
}
1573
1574
1575
/*
1576
 * Performs the seek (called by reader thread).
1577
 *
1578
 * This simply drains the entire queue, throwing away blocks, seeks,
1579
 * and starts it up again.  Brute force, but maybe sufficient.
1580
 */
1581
0
static void bgzf_mt_seek(BGZF *fp) {
1582
0
    mtaux_t *mt = fp->mt;
1583
1584
0
    hts_tpool_process_reset(mt->out_queue, 0);
1585
0
    pthread_mutex_lock(&mt->job_pool_m);
1586
0
    mt->errcode = 0;
1587
1588
0
    if (hseek(fp->fp, mt->block_address, SEEK_SET) < 0)
1589
0
        mt->errcode = errno;
1590
1591
0
    pthread_mutex_unlock(&mt->job_pool_m);
1592
0
    mt->command = SEEK_DONE;
1593
0
    pthread_cond_signal(&mt->command_c);
1594
0
}
1595
1596
0
static void *bgzf_mt_reader(void *vp) {
1597
0
    BGZF *fp = (BGZF *)vp;
1598
0
    mtaux_t *mt = fp->mt;
1599
1600
0
restart:
1601
0
    pthread_mutex_lock(&mt->job_pool_m);
1602
0
    bgzf_job *j = pool_alloc(mt->job_pool);
1603
0
    pthread_mutex_unlock(&mt->job_pool_m);
1604
0
    if (!j) goto err;
1605
0
    j->errcode = 0;
1606
0
    j->comp_len = 0;
1607
0
    j->uncomp_len = 0;
1608
0
    j->hit_eof = 0;
1609
0
    j->fp = fp;
1610
1611
0
    while (bgzf_mt_read_block(fp, j) == 0) {
1612
        // Dispatch
1613
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_decode_func, j,
1614
0
                                job_cleanup, job_cleanup, 0) < 0) {
1615
0
            job_cleanup(j);
1616
0
            goto err;
1617
0
        }
1618
1619
        // Check for command
1620
0
        pthread_mutex_lock(&mt->command_m);
1621
0
        switch (mt->command) {
1622
0
        case SEEK:
1623
0
            bgzf_mt_seek(fp);  // Sets mt->command to SEEK_DONE
1624
0
            pthread_mutex_unlock(&mt->command_m);
1625
0
            goto restart;
1626
1627
0
        case HAS_EOF:
1628
0
            bgzf_mt_eof(fp);   // Sets mt->command to HAS_EOF_DONE
1629
0
            break;
1630
1631
0
        case SEEK_DONE:
1632
0
        case HAS_EOF_DONE:
1633
0
            pthread_cond_signal(&mt->command_c);
1634
0
            break;
1635
1636
0
        case CLOSE:
1637
0
            pthread_cond_signal(&mt->command_c);
1638
0
            pthread_mutex_unlock(&mt->command_m);
1639
0
            hts_tpool_process_destroy(mt->out_queue);
1640
0
            return NULL;
1641
1642
0
        default:
1643
0
            break;
1644
0
        }
1645
0
        pthread_mutex_unlock(&mt->command_m);
1646
1647
        // Allocate buffer for next block
1648
0
        pthread_mutex_lock(&mt->job_pool_m);
1649
0
        j = pool_alloc(mt->job_pool);
1650
0
        pthread_mutex_unlock(&mt->job_pool_m);
1651
0
        if (!j) {
1652
0
            hts_tpool_process_destroy(mt->out_queue);
1653
0
            return NULL;
1654
0
        }
1655
0
        j->errcode = 0;
1656
0
        j->comp_len = 0;
1657
0
        j->uncomp_len = 0;
1658
0
        j->hit_eof = 0;
1659
0
        j->fp = fp;
1660
0
    }
1661
1662
0
    if (j->errcode == BGZF_ERR_MT) {
1663
        // Attempt to multi-thread decode a raw gzip stream cannot be done.
1664
        // We tear down the multi-threaded decoder and revert to the old code.
1665
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_nul_func, j,
1666
0
                                job_cleanup, job_cleanup, 0) < 0) {
1667
0
            job_cleanup(j);
1668
0
            hts_tpool_process_destroy(mt->out_queue);
1669
0
            return NULL;
1670
0
        }
1671
0
        hts_tpool_process_ref_decr(mt->out_queue);
1672
0
        return &j->errcode;
1673
0
    }
1674
1675
    // Dispatch an empty block so EOF is spotted.
1676
    // We also use this mechanism for returning errors, in which case
1677
    // j->errcode is set already.
1678
1679
0
    j->hit_eof = 1;
1680
0
    if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_nul_func, j,
1681
0
                            job_cleanup, job_cleanup, 0) < 0) {
1682
0
        job_cleanup(j);
1683
0
        hts_tpool_process_destroy(mt->out_queue);
1684
0
        return NULL;
1685
0
    }
1686
0
    if (j->errcode != 0) {
1687
0
        hts_tpool_process_destroy(mt->out_queue);
1688
0
        return &j->errcode;
1689
0
    }
1690
1691
    // We hit EOF so can stop reading, but we may get a subsequent
1692
    // seek request.  In this case we need to restart the reader.
1693
    //
1694
    // To handle this we wait on a condition variable and then
1695
    // monitor the command. (This could be either seek or close.)
1696
0
    for (;;) {
1697
0
        pthread_mutex_lock(&mt->command_m);
1698
0
        if (mt->command == NONE)
1699
0
            pthread_cond_wait(&mt->command_c, &mt->command_m);
1700
0
        switch(mt->command) {
1701
0
        default:
1702
0
            pthread_mutex_unlock(&mt->command_m);
1703
0
            break;
1704
1705
0
        case SEEK:
1706
0
            bgzf_mt_seek(fp);
1707
0
            pthread_mutex_unlock(&mt->command_m);
1708
0
            goto restart;
1709
1710
0
        case HAS_EOF:
1711
0
            bgzf_mt_eof(fp);   // Sets mt->command to HAS_EOF_DONE
1712
0
            pthread_mutex_unlock(&mt->command_m);
1713
0
            break;
1714
1715
0
        case SEEK_DONE:
1716
0
        case HAS_EOF_DONE:
1717
0
            pthread_cond_signal(&mt->command_c);
1718
0
            pthread_mutex_unlock(&mt->command_m);
1719
0
            break;
1720
1721
0
        case CLOSE:
1722
0
            pthread_cond_signal(&mt->command_c);
1723
0
            pthread_mutex_unlock(&mt->command_m);
1724
0
            hts_tpool_process_destroy(mt->out_queue);
1725
0
            return NULL;
1726
0
        }
1727
0
    }
1728
1729
0
 err:
1730
0
    pthread_mutex_lock(&mt->command_m);
1731
0
    mt->command = CLOSE;
1732
0
    pthread_cond_signal(&mt->command_c);
1733
0
    pthread_mutex_unlock(&mt->command_m);
1734
0
    hts_tpool_process_destroy(mt->out_queue);
1735
0
    return NULL;
1736
0
}
1737
1738
0
int bgzf_thread_pool(BGZF *fp, hts_tpool *pool, int qsize) {
1739
    // No gain from multi-threading when not compressed
1740
0
    if (!fp->is_compressed)
1741
0
        return 0;
1742
1743
0
    mtaux_t *mt;
1744
0
    mt = (mtaux_t*)calloc(1, sizeof(mtaux_t));
1745
0
    if (!mt) return -1;
1746
0
    fp->mt = mt;
1747
1748
0
    mt->pool = pool;
1749
0
    mt->n_threads = hts_tpool_size(pool);
1750
0
    if (!qsize)
1751
0
        qsize = mt->n_threads*2;
1752
0
    if (!(mt->out_queue = hts_tpool_process_init(mt->pool, qsize, 0)))
1753
0
        goto err;
1754
0
    hts_tpool_process_ref_incr(mt->out_queue);
1755
1756
0
    mt->job_pool = pool_create(sizeof(bgzf_job));
1757
0
    if (!mt->job_pool)
1758
0
        goto err;
1759
1760
0
    pthread_mutex_init(&mt->job_pool_m, NULL);
1761
0
    pthread_mutex_init(&mt->command_m, NULL);
1762
0
    pthread_mutex_init(&mt->idx_m, NULL);
1763
0
    pthread_cond_init(&mt->command_c, NULL);
1764
0
    mt->flush_pending = 0;
1765
0
    mt->jobs_pending = 0;
1766
0
    mt->free_block = fp->uncompressed_block; // currently in-use block
1767
0
    mt->block_address = fp->block_address;
1768
0
    pthread_create(&mt->io_task, NULL,
1769
0
                   fp->is_write ? bgzf_mt_writer : bgzf_mt_reader, fp);
1770
1771
0
    return 0;
1772
1773
0
 err:
1774
0
    free(mt);
1775
0
    fp->mt = NULL;
1776
0
    return -1;
1777
0
}
1778
1779
int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1780
0
{
1781
    // No gain from multi-threading when not compressed
1782
0
    if (!fp->is_compressed || fp->is_gzip)
1783
0
        return 0;
1784
1785
0
    if (n_threads < 1) return -1;
1786
0
    hts_tpool *p = hts_tpool_init(n_threads);
1787
0
    if (!p)
1788
0
        return -1;
1789
1790
0
    if (bgzf_thread_pool(fp, p, 0) != 0) {
1791
0
        hts_tpool_destroy(p);
1792
0
        return -1;
1793
0
    }
1794
1795
0
    fp->mt->own_pool = 1;
1796
1797
0
    return 0;
1798
0
}
1799
1800
static int mt_destroy(mtaux_t *mt)
1801
0
{
1802
0
    int ret = 0;
1803
1804
    // Tell the reader to shut down
1805
0
    pthread_mutex_lock(&mt->command_m);
1806
0
    mt->command = CLOSE;
1807
0
    pthread_cond_signal(&mt->command_c);
1808
0
    hts_tpool_wake_dispatch(mt->out_queue); // unstick the reader
1809
0
    pthread_mutex_unlock(&mt->command_m);
1810
1811
    // Check for thread worker failure, indicated by is_shutdown returning 2
1812
    // It's possible really late errors might be missed, but we can live with
1813
    // that.
1814
0
    ret = -(hts_tpool_process_is_shutdown(mt->out_queue) > 1);
1815
    // Destroying the queue first forces the writer to exit.
1816
    // mt->out_queue is reference counted, so destroy gets called in both
1817
    // this and the IO threads.  The last to do it will clean up.
1818
0
    hts_tpool_process_destroy(mt->out_queue);
1819
1820
    // IO thread will now exit.  Wait for it and perform final clean-up.
1821
    // If it returned non-NULL, it was not happy.
1822
0
    void *retval = NULL;
1823
0
    pthread_join(mt->io_task, &retval);
1824
0
    ret = retval != NULL ? -1 : ret;
1825
1826
0
    pthread_mutex_destroy(&mt->job_pool_m);
1827
0
    pthread_mutex_destroy(&mt->command_m);
1828
0
    pthread_mutex_destroy(&mt->idx_m);
1829
0
    pthread_cond_destroy(&mt->command_c);
1830
0
    if (mt->curr_job)
1831
0
        pool_free(mt->job_pool, mt->curr_job);
1832
1833
0
    if (mt->own_pool)
1834
0
        hts_tpool_destroy(mt->pool);
1835
1836
0
    pool_destroy(mt->job_pool);
1837
1838
0
    if (mt->idx_cache.e)
1839
0
        free(mt->idx_cache.e);
1840
1841
0
    free(mt);
1842
0
    fflush(stderr);
1843
1844
0
    return ret;
1845
0
}
1846
1847
static int mt_queue(BGZF *fp)
1848
0
{
1849
0
    mtaux_t *mt = fp->mt;
1850
1851
0
    mt->block_number++;
1852
1853
    // Also updated by writer thread
1854
0
    pthread_mutex_lock(&mt->job_pool_m);
1855
0
    bgzf_job *j = pool_alloc(mt->job_pool);
1856
0
    if (j) mt->jobs_pending++;
1857
0
    pthread_mutex_unlock(&mt->job_pool_m);
1858
0
    if (!j) return -1;
1859
1860
0
    j->fp = fp;
1861
0
    j->errcode = 0;
1862
0
    j->uncomp_len  = fp->block_offset;
1863
0
    if (fp->compress_level == 0) {
1864
0
        memcpy(j->comp_data + BLOCK_HEADER_LENGTH + 5, fp->uncompressed_block,
1865
0
               j->uncomp_len);
1866
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue,
1867
0
                                bgzf_encode_level0_func, j,
1868
0
                                job_cleanup, job_cleanup, 0) < 0) {
1869
0
            goto fail;
1870
0
        }
1871
0
    } else {
1872
0
        memcpy(j->uncomp_data, fp->uncompressed_block, j->uncomp_len);
1873
1874
        // Need non-block vers & job_pending?
1875
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_encode_func, j,
1876
0
                                job_cleanup, job_cleanup, 0) < 0) {
1877
0
            goto fail;
1878
0
        }
1879
0
    }
1880
1881
0
    fp->block_offset = 0;
1882
0
    return 0;
1883
1884
0
 fail:
1885
0
    job_cleanup(j);
1886
0
    pthread_mutex_lock(&mt->job_pool_m);
1887
0
    mt->jobs_pending--;
1888
0
    pthread_mutex_unlock(&mt->job_pool_m);
1889
0
    return -1;
1890
0
}
1891
1892
static int mt_flush_queue(BGZF *fp)
1893
0
{
1894
0
    mtaux_t *mt = fp->mt;
1895
1896
    // Drain the encoder jobs.
1897
    // We cannot use hts_tpool_flush here as it can cause deadlock if
1898
    // the queue is full up of decoder tasks.  The best solution would
1899
    // be to have one input queue per type of job, but we don't right now.
1900
    //hts_tpool_flush(mt->pool);
1901
0
    pthread_mutex_lock(&mt->job_pool_m);
1902
0
    int shutdown = 0;
1903
0
    while (mt->jobs_pending != 0) {
1904
0
        if ((shutdown = hts_tpool_process_is_shutdown(mt->out_queue)))
1905
0
            break;
1906
0
        pthread_mutex_unlock(&mt->job_pool_m);
1907
0
        hts_usleep(10000); // FIXME: replace by condition variable
1908
0
        pthread_mutex_lock(&mt->job_pool_m);
1909
0
    }
1910
0
    pthread_mutex_unlock(&mt->job_pool_m);
1911
1912
0
    if (shutdown)
1913
0
        return -1;
1914
1915
    // Wait on bgzf_mt_writer to drain the queue
1916
0
    if (hts_tpool_process_flush(mt->out_queue) != 0)
1917
0
        return -1;
1918
1919
0
    return (fp->errcode == 0)? 0 : -1;
1920
0
}
1921
1922
static int lazy_flush(BGZF *fp)
1923
20.2k
{
1924
20.2k
    if (fp->mt)
1925
0
        return fp->block_offset ? mt_queue(fp) : 0;
1926
20.2k
    else
1927
20.2k
        return bgzf_flush(fp);
1928
20.2k
}
1929
1930
#else  // ~ #ifdef BGZF_MT
1931
1932
int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1933
{
1934
    return 0;
1935
}
1936
1937
static inline int lazy_flush(BGZF *fp)
1938
{
1939
    return bgzf_flush(fp);
1940
}
1941
1942
#endif // ~ #ifdef BGZF_MT
1943
1944
int bgzf_flush(BGZF *fp)
1945
34.7k
{
1946
34.7k
    if (!fp->is_write) return 0;
1947
34.7k
#ifdef BGZF_MT
1948
34.7k
    if (fp->mt) {
1949
0
        int ret = 0;
1950
0
        if (fp->block_offset) ret = mt_queue(fp);
1951
0
        if (!ret) ret = mt_flush_queue(fp);
1952
1953
        // We maintain mt->block_address when threading as the
1954
        // main code can call bgzf_tell without any locks.
1955
        // (The result from tell are wrong, but we only care about the last
1956
        // 16-bits worth except for the final flush process.
1957
0
        pthread_mutex_lock(&fp->mt->idx_m);
1958
0
        fp->block_address = fp->mt->block_address;
1959
0
        pthread_mutex_unlock(&fp->mt->idx_m);
1960
1961
0
        return ret;
1962
0
    }
1963
34.7k
#endif
1964
1965
34.7k
    if (!fp->is_compressed) {
1966
0
        return hflush(fp->fp);
1967
0
    }
1968
1969
64.1k
    while (fp->block_offset > 0) {
1970
29.3k
        int block_length;
1971
29.3k
        if ( fp->idx_build_otf )
1972
0
        {
1973
0
            bgzf_index_add_block(fp);
1974
0
            fp->idx->ublock_addr += fp->block_offset;
1975
0
        }
1976
29.3k
        block_length = deflate_block(fp, fp->block_offset);
1977
29.3k
        if (block_length < 0) {
1978
0
            hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length, NULL));
1979
0
            return -1;
1980
0
        }
1981
29.3k
        if (hwrite(fp->fp, fp->compressed_block, block_length) != block_length) {
1982
0
            hts_log_error("File write failed (wrong size)");
1983
0
            fp->errcode |= BGZF_ERR_IO; // possibly truncated file
1984
0
            return -1;
1985
0
        }
1986
29.3k
        fp->block_address += block_length;
1987
29.3k
    }
1988
34.7k
    return 0;
1989
34.7k
}
1990
1991
int bgzf_flush_try(BGZF *fp, ssize_t size)
1992
4.23M
{
1993
4.23M
    if (fp->block_offset + size > BGZF_BLOCK_SIZE) return lazy_flush(fp);
1994
4.23M
    return 0;
1995
4.23M
}
1996
1997
ssize_t bgzf_write(BGZF *fp, const void *data, size_t length)
1998
35.8k
{
1999
35.8k
    if ( !fp->is_compressed ) {
2000
0
        size_t push = length + (size_t) fp->block_offset;
2001
0
        fp->block_offset = push % BGZF_MAX_BLOCK_SIZE;
2002
0
        fp->block_address += (push - fp->block_offset);
2003
0
        return hwrite(fp->fp, data, length);
2004
0
    }
2005
2006
35.8k
    const uint8_t *input = (const uint8_t*)data;
2007
35.8k
    ssize_t remaining = length;
2008
35.8k
    assert(fp->is_write);
2009
88.3k
    while (remaining > 0) {
2010
52.4k
        uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
2011
52.4k
        int copy_length = BGZF_BLOCK_SIZE - fp->block_offset;
2012
52.4k
        if (copy_length > remaining) copy_length = remaining;
2013
52.4k
        memcpy(buffer + fp->block_offset, input, copy_length);
2014
52.4k
        fp->block_offset += copy_length;
2015
52.4k
        input += copy_length;
2016
52.4k
        remaining -= copy_length;
2017
52.4k
        if (fp->block_offset == BGZF_BLOCK_SIZE) {
2018
16.7k
            if (lazy_flush(fp) != 0) return -1;
2019
16.7k
        }
2020
52.4k
    }
2021
35.8k
    return length - remaining;
2022
35.8k
}
2023
2024
ssize_t bgzf_block_write(BGZF *fp, const void *data, size_t length)
2025
0
{
2026
0
    if ( !fp->is_compressed ) {
2027
0
        size_t push = length + (size_t) fp->block_offset;
2028
0
        fp->block_offset = push % BGZF_MAX_BLOCK_SIZE;
2029
0
        fp->block_address += (push - fp->block_offset);
2030
0
        return hwrite(fp->fp, data, length);
2031
0
    }
2032
2033
0
    const uint8_t *input = (const uint8_t*)data;
2034
0
    ssize_t remaining = length;
2035
0
    assert(fp->is_write);
2036
0
    uint64_t current_block; //keep track of current block
2037
0
    uint64_t ublock_size; // amount of uncompressed data to be fed into next block
2038
0
    while (remaining > 0) {
2039
0
        current_block = fp->idx->moffs - fp->idx->noffs;
2040
0
        ublock_size = current_block + 1 < fp->idx->moffs ? fp->idx->offs[current_block+1].uaddr-fp->idx->offs[current_block].uaddr : BGZF_MAX_BLOCK_SIZE;
2041
0
        uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
2042
0
        int copy_length = ublock_size - fp->block_offset;
2043
0
        if (copy_length > remaining) copy_length = remaining;
2044
0
        memcpy(buffer + fp->block_offset, input, copy_length);
2045
0
        fp->block_offset += copy_length;
2046
0
        input += copy_length;
2047
0
        remaining -= copy_length;
2048
0
        if (fp->block_offset == ublock_size) {
2049
0
            if (lazy_flush(fp) != 0) return -1;
2050
0
            if (fp->idx->noffs > 0)
2051
0
                fp->idx->noffs--;  // decrement noffs to track the blocks
2052
0
        }
2053
0
    }
2054
0
    return length - remaining;
2055
0
}
2056
2057
2058
ssize_t bgzf_raw_write(BGZF *fp, const void *data, size_t length)
2059
0
{
2060
0
    ssize_t ret = hwrite(fp->fp, data, length);
2061
0
    if (ret < 0) fp->errcode |= BGZF_ERR_IO;
2062
0
    return ret;
2063
0
}
2064
2065
// Helper function for tidying up fp->mt and setting errcode
2066
11.8k
static void bgzf_close_mt(BGZF *fp) {
2067
11.8k
    if (fp->mt) {
2068
0
        if (!fp->mt->free_block)
2069
0
            fp->uncompressed_block = NULL;
2070
0
        if (mt_destroy(fp->mt) < 0)
2071
0
            fp->errcode = BGZF_ERR_IO;
2072
0
    }
2073
11.8k
}
2074
2075
int bgzf_close(BGZF* fp)
2076
12.2k
{
2077
12.2k
    int ret, block_length;
2078
12.2k
    if (fp == 0) return -1;
2079
11.8k
    if (fp->is_write && fp->is_compressed) {
2080
7.59k
        if (bgzf_flush(fp) != 0) {
2081
0
            bgzf_close_mt(fp);
2082
0
            return -1;
2083
0
        }
2084
7.59k
        fp->compress_level = -1;
2085
7.59k
        block_length = deflate_block(fp, 0); // write an empty block
2086
7.59k
        if (block_length < 0) {
2087
0
            hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length, NULL));
2088
0
            bgzf_close_mt(fp);
2089
0
            return -1;
2090
0
        }
2091
7.59k
        if (hwrite(fp->fp, fp->compressed_block, block_length) < 0
2092
7.59k
            || hflush(fp->fp) != 0) {
2093
0
            hts_log_error("File write failed");
2094
0
            fp->errcode |= BGZF_ERR_IO;
2095
0
            return -1;
2096
0
        }
2097
7.59k
    }
2098
2099
11.8k
    bgzf_close_mt(fp);
2100
2101
11.8k
    if ( fp->is_gzip )
2102
2.15k
    {
2103
2.15k
        if (fp->gz_stream == NULL) ret = Z_OK;
2104
1.55k
        else if (!fp->is_write) ret = inflateEnd(fp->gz_stream);
2105
0
        else ret = deflateEnd(fp->gz_stream);
2106
2.15k
        if (ret != Z_OK) {
2107
0
            hts_log_error("Call to inflateEnd/deflateEnd failed: %s", bgzf_zerr(ret, NULL));
2108
0
        }
2109
2.15k
        free(fp->gz_stream);
2110
2.15k
    }
2111
11.8k
    ret = hclose(fp->fp);
2112
11.8k
    if (ret != 0) return -1;
2113
11.8k
    bgzf_index_destroy(fp);
2114
11.8k
    free(fp->uncompressed_block);
2115
11.8k
    free_cache(fp);
2116
11.8k
    ret = fp->errcode ? -1 : 0;
2117
11.8k
    free(fp);
2118
11.8k
    return ret;
2119
11.8k
}
2120
2121
void bgzf_set_cache_size(BGZF *fp, int cache_size)
2122
0
{
2123
0
    if (fp && fp->mt) return; // Not appropriate when multi-threading
2124
0
    if (fp && fp->cache) fp->cache_size = cache_size;
2125
0
}
2126
2127
780
int bgzf_check_EOF(BGZF *fp) {
2128
780
    int has_eof;
2129
2130
780
    if (fp->mt) {
2131
0
        pthread_mutex_lock(&fp->mt->command_m);
2132
        // fp->mt->command state transitions should be:
2133
        // NONE -> HAS_EOF -> HAS_EOF_DONE -> NONE
2134
        // (HAS_EOF -> HAS_EOF_DONE happens in bgzf_mt_reader thread)
2135
0
        if (fp->mt->command != CLOSE)
2136
0
            fp->mt->command = HAS_EOF;
2137
0
        pthread_cond_signal(&fp->mt->command_c);
2138
0
        hts_tpool_wake_dispatch(fp->mt->out_queue);
2139
0
        do {
2140
0
            if (fp->mt->command == CLOSE) {
2141
                // possible error in bgzf_mt_reader
2142
0
                pthread_mutex_unlock(&fp->mt->command_m);
2143
0
                return 0;
2144
0
            }
2145
0
            pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
2146
0
            switch (fp->mt->command) {
2147
0
            case HAS_EOF_DONE: break;
2148
0
            case HAS_EOF:
2149
                // Resend signal intended for bgzf_mt_reader()
2150
0
                pthread_cond_signal(&fp->mt->command_c);
2151
0
                break;
2152
0
            case CLOSE:
2153
0
                continue;
2154
0
            default:
2155
0
                abort();  // Should not get to any other state
2156
0
            }
2157
0
        } while (fp->mt->command != HAS_EOF_DONE);
2158
0
        fp->mt->command = NONE;
2159
0
        has_eof = fp->mt->eof;
2160
0
        pthread_mutex_unlock(&fp->mt->command_m);
2161
780
    } else {
2162
780
        has_eof = bgzf_check_EOF_common(fp);
2163
780
    }
2164
2165
780
    fp->no_eof_block = (has_eof == 0);
2166
2167
780
    return has_eof;
2168
780
}
2169
2170
static inline int64_t bgzf_seek_common(BGZF* fp,
2171
                                       int64_t block_address, int block_offset)
2172
0
{
2173
0
    if (fp->mt) {
2174
        // The reader runs asynchronous and does loops of:
2175
        //    Read block
2176
        //    Check & process command
2177
        //    Dispatch decode job
2178
        //
2179
        // Once at EOF it then switches to loops of
2180
        //    Wait for command
2181
        //    Process command (possibly switching back to above loop).
2182
        //
2183
        // To seek we therefore send the reader thread a SEEK command,
2184
        // waking it up if blocked in dispatch and signalling if
2185
        // waiting for a command.  We then wait for the response so we
2186
        // know the seek succeeded.
2187
0
        pthread_mutex_lock(&fp->mt->command_m);
2188
0
        fp->mt->hit_eof = 0;
2189
        // fp->mt->command state transitions should be:
2190
        // NONE -> SEEK -> SEEK_DONE -> NONE
2191
        // (SEEK -> SEEK_DONE happens in bgzf_mt_reader thread)
2192
0
        fp->mt->command = SEEK;
2193
0
        fp->mt->block_address = block_address;
2194
0
        pthread_cond_signal(&fp->mt->command_c);
2195
0
        hts_tpool_wake_dispatch(fp->mt->out_queue);
2196
0
        do {
2197
0
            pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
2198
0
            switch (fp->mt->command) {
2199
0
            case SEEK_DONE: break;
2200
0
            case SEEK:
2201
                // Resend signal intended for bgzf_mt_reader()
2202
0
                pthread_cond_signal(&fp->mt->command_c);
2203
0
                break;
2204
0
            default:
2205
0
                abort();  // Should not get to any other state
2206
0
            }
2207
0
        } while (fp->mt->command != SEEK_DONE);
2208
2209
0
        fp->mt->command = NONE;
2210
2211
0
        if (fp->mt->errcode) {
2212
0
            fp->errcode |= BGZF_ERR_IO;
2213
0
            errno = fp->mt->errcode;
2214
0
            pthread_mutex_unlock(&fp->mt->command_m);
2215
0
            return -1;
2216
0
        }
2217
2218
0
        fp->block_length = 0;  // indicates current block has not been loaded
2219
0
        fp->block_address = block_address;
2220
0
        fp->block_offset = block_offset;
2221
2222
0
        pthread_mutex_unlock(&fp->mt->command_m);
2223
0
    } else {
2224
0
        if (hseek(fp->fp, block_address, SEEK_SET) < 0) {
2225
0
            fp->errcode |= BGZF_ERR_IO;
2226
0
            return -1;
2227
0
        }
2228
0
        fp->block_length = 0;  // indicates current block has not been loaded
2229
0
        fp->block_address = block_address;
2230
0
        fp->block_offset = block_offset;
2231
0
    }
2232
2233
0
    return 0;
2234
0
}
2235
2236
int64_t bgzf_seek(BGZF* fp, int64_t pos, int where)
2237
0
{
2238
0
    if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
2239
0
        fp->errcode |= BGZF_ERR_MISUSE;
2240
0
        return -1;
2241
0
    }
2242
2243
    // This is a flag to indicate we've jumped elsewhere in the stream, to act
2244
    // as a hint to any other code which is wrapping up bgzf for its own
2245
    // purposes.  We may not be able to tell when seek happens as it can be
2246
    // done on our behalf, eg by the iterator.
2247
    //
2248
    // This is never cleared here.  Any tool that needs to handle it is also
2249
    // responsible for clearing it.
2250
0
    fp->seeked = pos;
2251
2252
0
    return bgzf_seek_common(fp, pos >> 16, pos & 0xFFFF);
2253
0
}
2254
2255
int bgzf_is_bgzf(const char *fn)
2256
0
{
2257
0
    uint8_t buf[16];
2258
0
    int n;
2259
0
    hFILE *fp;
2260
0
    if ((fp = hopen(fn, "r")) == 0) return 0;
2261
0
    n = hread(fp, buf, 16);
2262
0
    if (hclose(fp) < 0) return 0;
2263
0
    if (n != 16) return 0;
2264
0
    return check_header(buf) == 0? 1 : 0;
2265
0
}
2266
2267
int bgzf_compression(BGZF *fp)
2268
0
{
2269
0
    return (!fp->is_compressed)? no_compression : (fp->is_gzip)? gzip : bgzf;
2270
0
}
2271
2272
int bgzf_getc(BGZF *fp)
2273
1.61k
{
2274
1.61k
    if (fp->block_offset+1 < fp->block_length) {
2275
0
        fp->uncompressed_address++;
2276
0
        return ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
2277
0
    }
2278
2279
1.61k
    int c;
2280
1.61k
    if (fp->block_offset >= fp->block_length) {
2281
1.17k
        if (bgzf_read_block(fp) != 0) return -2; /* error */
2282
1.16k
        if (fp->block_length == 0) return -1; /* end-of-file */
2283
1.16k
    }
2284
1.03k
    c = ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
2285
1.03k
    if (fp->block_offset == fp->block_length) {
2286
446
        fp->block_address = bgzf_htell(fp);
2287
446
        fp->block_offset = 0;
2288
446
        fp->block_length = 0;
2289
446
    }
2290
1.03k
    fp->uncompressed_address++;
2291
1.03k
    return c;
2292
1.61k
}
2293
2294
int bgzf_getline(BGZF *fp, int delim, kstring_t *str)
2295
123M
{
2296
123M
    int l, state = 0;
2297
123M
    str->l = 0;
2298
123M
    do {
2299
123M
        if (fp->block_offset >= fp->block_length) {
2300
80.0k
            if (bgzf_read_block(fp) != 0) { state = -2; break; }
2301
79.1k
            if (fp->block_length == 0) { state = -1; break; }
2302
79.1k
        }
2303
123M
        unsigned char *buf = fp->uncompressed_block;
2304
2305
        // Equivalent to a naive byte by byte search from
2306
        // buf + block_offset to buf + block_length.
2307
123M
        void *e = memchr(&buf[fp->block_offset], delim,
2308
123M
                         fp->block_length - fp->block_offset);
2309
123M
        l = e ? (unsigned char *)e - buf : fp->block_length;
2310
2311
123M
        if (l < fp->block_length) state = 1;
2312
123M
        l -= fp->block_offset;
2313
123M
        if (ks_expand(str, l + 2) < 0) { state = -3; break; }
2314
123M
        memcpy(str->s + str->l, buf + fp->block_offset, l);
2315
123M
        str->l += l;
2316
123M
        fp->block_offset += l + 1;
2317
123M
        if (fp->block_offset >= fp->block_length) {
2318
78.4k
            fp->block_address = bgzf_htell(fp);
2319
78.4k
            fp->block_offset = 0;
2320
78.4k
            fp->block_length = 0;
2321
78.4k
        }
2322
123M
    } while (state == 0);
2323
123M
    if (state < -1) return state;
2324
123M
    if (str->l == 0 && state < 0) return state;
2325
123M
    fp->uncompressed_address += str->l + 1;
2326
123M
    if ( delim=='\n' && str->l>0 && str->s[str->l-1]=='\r' ) str->l--;
2327
123M
    str->s[str->l] = 0;
2328
123M
    return str->l <= INT_MAX ? (int) str->l : INT_MAX;
2329
123M
}
2330
2331
void bgzf_index_destroy(BGZF *fp)
2332
11.8k
{
2333
11.8k
    if ( !fp->idx ) return;
2334
5
    free(fp->idx->offs);
2335
5
    free(fp->idx);
2336
5
    fp->idx = NULL;
2337
5
    fp->idx_build_otf = 0;
2338
5
}
2339
2340
int bgzf_index_build_init(BGZF *fp)
2341
5
{
2342
5
    bgzf_index_destroy(fp);
2343
5
    fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
2344
5
    if ( !fp->idx ) return -1;
2345
5
    fp->idx_build_otf = 1;  // build index on the fly
2346
5
    return 0;
2347
5
}
2348
2349
int bgzf_index_add_block(BGZF *fp)
2350
0
{
2351
0
    fp->idx->noffs++;
2352
0
    if ( fp->idx->noffs > fp->idx->moffs )
2353
0
    {
2354
0
        fp->idx->moffs = fp->idx->noffs;
2355
0
        kroundup32(fp->idx->moffs);
2356
0
        fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
2357
0
        if ( !fp->idx->offs ) return -1;
2358
0
    }
2359
0
    fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->ublock_addr;
2360
0
    fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->block_address;
2361
0
    return 0;
2362
0
}
2363
2364
static inline int hwrite_uint64(uint64_t x, hFILE *f)
2365
0
{
2366
0
    if (ed_is_big()) x = ed_swap_8(x);
2367
0
    if (hwrite(f, &x, sizeof(x)) != sizeof(x)) return -1;
2368
0
    return 0;
2369
0
}
2370
2371
static char * get_name_suffix(const char *bname, const char *suffix)
2372
0
{
2373
0
    size_t len = strlen(bname) + strlen(suffix) + 1;
2374
0
    char *buff = malloc(len);
2375
0
    if (!buff) return NULL;
2376
0
    snprintf(buff, len, "%s%s", bname, suffix);
2377
0
    return buff;
2378
0
}
2379
2380
int bgzf_index_dump_hfile(BGZF *fp, struct hFILE *idx, const char *name)
2381
0
{
2382
    // Note that the index contains one extra record when indexing files opened
2383
    // for reading. The terminating record is not present when opened for writing.
2384
    // This is not a bug.
2385
2386
0
    int i;
2387
2388
0
    if (!fp->idx) {
2389
0
        hts_log_error("Called for BGZF handle with no index");
2390
0
        errno = EINVAL;
2391
0
        return -1;
2392
0
    }
2393
2394
0
    if (bgzf_flush(fp) != 0) return -1;
2395
2396
    // discard the entry marking the end of the file
2397
0
    if (fp->mt && fp->idx)
2398
0
        fp->idx->noffs--;
2399
2400
0
    if (hwrite_uint64(fp->idx->noffs - 1, idx) < 0) goto fail;
2401
0
    for (i=1; i<fp->idx->noffs; i++)
2402
0
    {
2403
0
        if (hwrite_uint64(fp->idx->offs[i].caddr, idx) < 0) goto fail;
2404
0
        if (hwrite_uint64(fp->idx->offs[i].uaddr, idx) < 0) goto fail;
2405
0
    }
2406
0
    return 0;
2407
2408
0
 fail:
2409
0
    hts_log_error("Error writing to %s : %s", name ? name : "index", strerror(errno));
2410
0
    return -1;
2411
0
}
2412
2413
int bgzf_index_dump(BGZF *fp, const char *bname, const char *suffix)
2414
0
{
2415
0
    const char *name = bname, *msg = NULL;
2416
0
    char *tmp = NULL;
2417
0
    hFILE *idx = NULL;
2418
2419
0
    if (!fp->idx) {
2420
0
        hts_log_error("Called for BGZF handle with no index");
2421
0
        errno = EINVAL;
2422
0
        return -1;
2423
0
    }
2424
2425
0
    if ( suffix )
2426
0
    {
2427
0
        tmp = get_name_suffix(bname, suffix);
2428
0
        if ( !tmp ) return -1;
2429
0
        name = tmp;
2430
0
    }
2431
2432
0
    idx = hopen(name, "wb");
2433
0
    if ( !idx ) {
2434
0
        msg = "Error opening";
2435
0
        goto fail;
2436
0
    }
2437
2438
0
    if (bgzf_index_dump_hfile(fp, idx, name) != 0) goto fail;
2439
2440
0
    if (hclose(idx) < 0)
2441
0
    {
2442
0
        idx = NULL;
2443
0
        msg = "Error on closing";
2444
0
        goto fail;
2445
0
    }
2446
2447
0
    free(tmp);
2448
0
    return 0;
2449
2450
0
 fail:
2451
0
    if (msg != NULL) {
2452
0
        hts_log_error("%s %s : %s", msg, name, strerror(errno));
2453
0
    }
2454
0
    if (idx) hclose_abruptly(idx);
2455
0
    free(tmp);
2456
0
    return -1;
2457
0
}
2458
2459
static inline int hread_uint64(uint64_t *xptr, hFILE *f)
2460
0
{
2461
0
    if (hread(f, xptr, sizeof(*xptr)) != sizeof(*xptr)) return -1;
2462
0
    if (ed_is_big()) ed_swap_8p(xptr);
2463
0
    return 0;
2464
0
}
2465
2466
int bgzf_index_load_hfile(BGZF *fp, struct hFILE *idx, const char *name)
2467
0
{
2468
0
    fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
2469
0
    if (fp->idx == NULL) goto fail;
2470
0
    uint64_t x;
2471
0
    if (hread_uint64(&x, idx) < 0) goto fail;
2472
2473
0
    fp->idx->noffs = fp->idx->moffs = x + 1;
2474
0
    fp->idx->offs  = (bgzidx1_t*) malloc(fp->idx->moffs*sizeof(bgzidx1_t));
2475
0
    if (fp->idx->offs == NULL) goto fail;
2476
0
    fp->idx->offs[0].caddr = fp->idx->offs[0].uaddr = 0;
2477
2478
0
    int i;
2479
0
    for (i=1; i<fp->idx->noffs; i++)
2480
0
    {
2481
0
        if (hread_uint64(&fp->idx->offs[i].caddr, idx) < 0) goto fail;
2482
0
        if (hread_uint64(&fp->idx->offs[i].uaddr, idx) < 0) goto fail;
2483
0
    }
2484
2485
0
    return 0;
2486
2487
0
 fail:
2488
0
    hts_log_error("Error reading %s : %s", name ? name : "index", strerror(errno));
2489
0
    if (fp->idx) {
2490
0
        free(fp->idx->offs);
2491
0
        free(fp->idx);
2492
0
        fp->idx = NULL;
2493
0
    }
2494
0
    return -1;
2495
0
}
2496
2497
int bgzf_index_load(BGZF *fp, const char *bname, const char *suffix)
2498
0
{
2499
0
    const char *name = bname, *msg = NULL;
2500
0
    char *tmp = NULL;
2501
0
    hFILE *idx = NULL;
2502
0
    if ( suffix )
2503
0
    {
2504
0
        tmp = get_name_suffix(bname, suffix);
2505
0
        if ( !tmp ) return -1;
2506
0
        name = tmp;
2507
0
    }
2508
2509
0
    idx = hopen(name, "rb");
2510
0
    if ( !idx ) {
2511
0
        msg = "Error opening";
2512
0
        goto fail;
2513
0
    }
2514
2515
0
    if (bgzf_index_load_hfile(fp, idx, name) != 0) goto fail;
2516
2517
0
    if (hclose(idx) != 0) {
2518
0
        idx = NULL;
2519
0
        msg = "Error closing";
2520
0
        goto fail;
2521
0
    }
2522
2523
0
    free(tmp);
2524
0
    return 0;
2525
2526
0
 fail:
2527
0
    if (msg != NULL) {
2528
0
        hts_log_error("%s %s : %s", msg, name, strerror(errno));
2529
0
    }
2530
0
    if (idx) hclose_abruptly(idx);
2531
0
    free(tmp);
2532
0
    return -1;
2533
0
}
2534
2535
int bgzf_useek(BGZF *fp, off_t uoffset, int where)
2536
0
{
2537
0
    if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
2538
0
        fp->errcode |= BGZF_ERR_MISUSE;
2539
0
        return -1;
2540
0
    }
2541
0
    if (uoffset >= fp->uncompressed_address - fp->block_offset &&
2542
0
        uoffset < fp->uncompressed_address + fp->block_length - fp->block_offset) {
2543
        // Can seek into existing data
2544
0
        fp->block_offset += uoffset - fp->uncompressed_address;
2545
0
        fp->uncompressed_address = uoffset;
2546
0
        return 0;
2547
0
    }
2548
0
    if ( !fp->is_compressed )
2549
0
    {
2550
0
        if (hseek(fp->fp, uoffset, SEEK_SET) < 0)
2551
0
        {
2552
0
            fp->errcode |= BGZF_ERR_IO;
2553
0
            return -1;
2554
0
        }
2555
0
        fp->block_length = 0;  // indicates current block has not been loaded
2556
0
        fp->block_address = uoffset;
2557
0
        fp->block_offset = 0;
2558
0
        if (bgzf_read_block(fp) < 0) {
2559
0
            fp->errcode |= BGZF_ERR_IO;
2560
0
            return -1;
2561
0
        }
2562
0
        fp->uncompressed_address = uoffset;
2563
0
        return 0;
2564
0
    }
2565
2566
0
    if ( !fp->idx )
2567
0
    {
2568
0
        fp->errcode |= BGZF_ERR_IO;
2569
0
        return -1;
2570
0
    }
2571
2572
    // binary search
2573
0
    int ilo = 0, ihi = fp->idx->noffs - 1;
2574
0
    while ( ilo<=ihi )
2575
0
    {
2576
0
        int i = (ilo+ihi)*0.5;
2577
0
        if ( uoffset < fp->idx->offs[i].uaddr ) ihi = i - 1;
2578
0
        else if ( uoffset >= fp->idx->offs[i].uaddr ) ilo = i + 1;
2579
0
        else break;
2580
0
    }
2581
0
    int i = ilo-1;
2582
0
    off_t offset = 0;
2583
0
    if (bgzf_seek_common(fp, fp->idx->offs[i].caddr, 0) < 0)
2584
0
        return -1;
2585
2586
0
    if ( bgzf_read_block(fp) < 0 ) {
2587
0
        fp->errcode |= BGZF_ERR_IO;
2588
0
        return -1;
2589
0
    }
2590
0
    offset = uoffset - fp->idx->offs[i].uaddr;
2591
0
    if ( offset > 0 )
2592
0
    {
2593
0
        if (offset > fp->block_length) {
2594
0
            fp->errcode |= BGZF_ERR_IO;
2595
0
            return -1;                                      //offset outside the available data
2596
0
        }
2597
0
        fp->block_offset = offset;
2598
0
        assert( fp->block_offset <= fp->block_length );     // todo: skipped, unindexed, blocks
2599
0
    }
2600
0
    fp->uncompressed_address = uoffset;
2601
0
    return 0;
2602
0
}
2603
2604
off_t bgzf_utell(BGZF *fp)
2605
16.7k
{
2606
16.7k
    return fp->uncompressed_address;    // currently maintained only when reading
2607
16.7k
}
2608
2609
/* prototype is in hfile_internal.h */
2610
0
struct hFILE *bgzf_hfile(struct BGZF *fp) {
2611
0
    return fp->fp;
2612
0
}