Coverage Report

Created: 2023-01-24 06:07

/src/htslib/bgzf.c
Line
Count
Source (jump to first uncovered line)
1
/* The MIT License
2
3
   Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4
                 2011, 2012 Attractive Chaos <attractor@live.co.uk>
5
   Copyright (C) 2009, 2013-2021 Genome Research Ltd
6
7
   Permission is hereby granted, free of charge, to any person obtaining a copy
8
   of this software and associated documentation files (the "Software"), to deal
9
   in the Software without restriction, including without limitation the rights
10
   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11
   copies of the Software, and to permit persons to whom the Software is
12
   furnished to do so, subject to the following conditions:
13
14
   The above copyright notice and this permission notice shall be included in
15
   all copies or substantial portions of the Software.
16
17
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19
   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20
   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21
   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22
   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23
   THE SOFTWARE.
24
*/
25
26
#define HTS_BUILDING_LIBRARY // Enables HTSLIB_EXPORT, see htslib/hts_defs.h
27
#include <config.h>
28
29
#include <stdio.h>
30
#include <stdlib.h>
31
#include <string.h>
32
#include <errno.h>
33
#include <unistd.h>
34
#include <assert.h>
35
#include <pthread.h>
36
#include <sys/types.h>
37
#include <inttypes.h>
38
#include <zlib.h>
39
40
#ifdef HAVE_LIBDEFLATE
41
#include <libdeflate.h>
42
#endif
43
44
#include "htslib/hts.h"
45
#include "htslib/bgzf.h"
46
#include "htslib/hfile.h"
47
#include "htslib/thread_pool.h"
48
#include "htslib/hts_endian.h"
49
#include "cram/pooled_alloc.h"
50
#include "hts_internal.h"
51
52
#ifndef EFTYPE
53
1
#define EFTYPE ENOEXEC
54
#endif
55
56
#define BGZF_CACHE
57
#define BGZF_MT
58
59
4.41k
#define BLOCK_HEADER_LENGTH 18
60
0
#define BLOCK_FOOTER_LENGTH 8
61
62
63
/* BGZF/GZIP header (specialized from RFC 1952; little endian):
64
 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
65
 | 31|139|  8|  4|              0|  0|255|      6| 66| 67|      2|BLK_LEN|
66
 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
67
  BGZF extension:
68
                ^                              ^   ^   ^
69
                |                              |   |   |
70
               FLG.EXTRA                     XLEN  B   C
71
72
  BGZF format is compatible with GZIP. It limits the size of each compressed
73
  block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
74
  records the size.
75
76
*/
77
static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
78
79
#ifdef BGZF_CACHE
80
typedef struct {
81
    int size;
82
    uint8_t *block;
83
    int64_t end_offset;
84
} cache_t;
85
86
#include "htslib/khash.h"
87
KHASH_MAP_INIT_INT64(cache, cache_t)
88
#endif
89
90
struct bgzf_cache_t {
91
    khash_t(cache) *h;
92
    khint_t last_pos;
93
};
94
95
#ifdef BGZF_MT
96
97
typedef struct bgzf_job {
98
    BGZF *fp;
99
    unsigned char comp_data[BGZF_MAX_BLOCK_SIZE];
100
    size_t comp_len;
101
    unsigned char uncomp_data[BGZF_MAX_BLOCK_SIZE];
102
    size_t uncomp_len;
103
    int errcode;
104
    int64_t block_address;
105
    int hit_eof;
106
} bgzf_job;
107
108
enum mtaux_cmd {
109
    NONE = 0,
110
    SEEK,
111
    SEEK_DONE,
112
    HAS_EOF,
113
    HAS_EOF_DONE,
114
    CLOSE,
115
};
116
117
// When multi-threaded bgzf_tell won't work, so we delay the hts_idx_push
118
// until we've written the last block.
119
typedef struct {
120
    hts_pos_t beg, end;
121
    int tid, is_mapped;  // args for hts_idx_push
122
    uint64_t offset, block_number;
123
} hts_idx_cache_entry;
124
125
typedef struct {
126
    int nentries, mentries; // used and allocated
127
    hts_idx_cache_entry *e; // hts_idx elements
128
} hts_idx_cache_t;
129
130
typedef struct bgzf_mtaux_t {
131
    // Memory pool for bgzf_job structs, to avoid many malloc/free
132
    pool_alloc_t *job_pool;
133
    bgzf_job *curr_job;
134
135
    // Thread pool
136
    int n_threads;
137
    int own_pool;
138
    hts_tpool *pool;
139
140
    // Output queue holding completed bgzf_jobs
141
    hts_tpool_process *out_queue;
142
143
    // I/O thread.
144
    pthread_t io_task;
145
    pthread_mutex_t job_pool_m;
146
    int jobs_pending; // number of jobs waiting
147
    int flush_pending;
148
    void *free_block;
149
    int hit_eof;  // r/w entirely within main thread
150
151
    // Message passing to the reader thread; eg seek requests
152
    int errcode;
153
    uint64_t block_address;
154
    int eof;
155
    pthread_mutex_t command_m; // Set whenever fp is being updated
156
    pthread_cond_t command_c;
157
    enum mtaux_cmd command;
158
159
    // For multi-threaded on-the-fly indexing. See bgzf_idx_push below.
160
    pthread_mutex_t idx_m;
161
    hts_idx_t *hts_idx;
162
    uint64_t block_number, block_written;
163
    hts_idx_cache_t idx_cache;
164
} mtaux_t;
165
#endif
166
167
typedef struct
168
{
169
    uint64_t uaddr;  // offset w.r.t. uncompressed data
170
    uint64_t caddr;  // offset w.r.t. compressed data
171
}
172
bgzidx1_t;
173
174
struct bgzidx_t
175
{
176
    int noffs, moffs;       // the size of the index, n:used, m:allocated
177
    bgzidx1_t *offs;        // offsets
178
    uint64_t ublock_addr;   // offset of the current block (uncompressed data)
179
};
180
181
/*
182
 * Buffers up arguments to hts_idx_push for later use, once we've written all bar
183
 * this block.  This is necessary when multiple blocks are in flight (threading)
184
 * and fp->block_address isn't known at the time of call as we have in-flight
185
 * blocks that haven't yet been compressed.
186
 *
187
 * NB: this only matters when we're indexing on the fly (writing).
188
 * Normal indexing is threaded reads, but we already know block sizes
189
 * so it's a simpler process
190
 *
191
 * Returns 0 on success,
192
 *        -1 on failure
193
 */
194
0
int bgzf_idx_push(BGZF *fp, hts_idx_t *hidx, int tid, hts_pos_t beg, hts_pos_t end, uint64_t offset, int is_mapped) {
195
0
    hts_idx_cache_entry *e;
196
0
    mtaux_t *mt = fp->mt;
197
198
0
    if (!mt)
199
0
        return hts_idx_push(hidx, tid, beg, end, offset, is_mapped);
200
201
    // Early check for out of range positions which would fail in hts_idx_push()
202
0
    if (hts_idx_check_range(hidx, tid, beg, end) < 0)
203
0
        return -1;
204
205
0
    pthread_mutex_lock(&mt->idx_m);
206
207
0
    mt->hts_idx = hidx;
208
0
    hts_idx_cache_t *ic = &mt->idx_cache;
209
210
0
    if (ic->nentries >= ic->mentries) {
211
0
        int new_sz = ic->mentries ? ic->mentries*2 : 1024;
212
0
        if (!(e = realloc(ic->e, new_sz * sizeof(*ic->e)))) {
213
0
            pthread_mutex_unlock(&mt->idx_m);
214
0
            return -1;
215
0
        }
216
0
        ic->e = e;
217
0
        ic->mentries = new_sz;
218
0
    }
219
220
0
    e = &ic->e[ic->nentries++];
221
0
    e->tid = tid;
222
0
    e->beg = beg;
223
0
    e->end = end;
224
0
    e->is_mapped = is_mapped;
225
0
    e->offset = offset & 0xffff;
226
0
    e->block_number = mt->block_number;
227
228
0
    pthread_mutex_unlock(&mt->idx_m);
229
230
0
    return 0;
231
0
}
232
233
/*
234
 * bgzf analogue to hts_idx_amend_last.
235
 *
236
 * This is needed when multi-threading and writing indices on the fly.
237
 * At the point of writing a record we know the virtual offset for start
238
 * and end, but that end virtual offset may be the end of the current
239
 * block.  In standard indexing our end virtual offset becomes the start
240
 * of the next block.  Thus to ensure bit for bit compatibility we
241
 * detect this boundary case and fix it up here.
242
 *
243
 * In theory this has no behavioural change, but it also works around
244
 * a bug elsewhere which causes bgzf_read to return 0 when our offset
245
 * is the end of a block rather than the start of the next.
246
 */
247
0
void bgzf_idx_amend_last(BGZF *fp, hts_idx_t *hidx, uint64_t offset) {
248
0
    mtaux_t *mt = fp->mt;
249
0
    if (!mt) {
250
0
        hts_idx_amend_last(hidx, offset);
251
0
        return;
252
0
    }
253
254
0
    pthread_mutex_lock(&mt->idx_m);
255
0
    hts_idx_cache_t *ic = &mt->idx_cache;
256
0
    if (ic->nentries > 0) {
257
0
        hts_idx_cache_entry *e = &ic->e[ic->nentries-1];
258
0
        if ((offset & 0xffff) == 0 && e->offset != 0) {
259
            // bumped to next block number
260
0
            e->offset = 0;
261
0
            e->block_number++;
262
0
        }
263
0
    }
264
0
    pthread_mutex_unlock(&mt->idx_m);
265
0
}
266
267
0
static int bgzf_idx_flush(BGZF *fp) {
268
0
    mtaux_t *mt = fp->mt;
269
270
0
    if (!mt->idx_cache.e) {
271
0
        mt->block_written++;
272
0
        return 0;
273
0
    }
274
275
0
    pthread_mutex_lock(&mt->idx_m);
276
277
0
    hts_idx_cache_entry *e = mt->idx_cache.e;
278
0
    int i;
279
280
0
    assert(mt->idx_cache.nentries == 0 || mt->block_written <= e[0].block_number);
281
282
0
    for (i = 0; i < mt->idx_cache.nentries && e[i].block_number == mt->block_written; i++) {
283
0
        if (hts_idx_push(mt->hts_idx, e[i].tid, e[i].beg, e[i].end,
284
0
                         (mt->block_address << 16) + e[i].offset,
285
0
                         e[i].is_mapped) < 0) {
286
0
            pthread_mutex_unlock(&mt->idx_m);
287
0
            return -1;
288
0
        }
289
0
    }
290
291
0
    memmove(&e[0], &e[i], (mt->idx_cache.nentries - i) * sizeof(*e));
292
0
    mt->idx_cache.nentries -= i;
293
0
    mt->block_written++;
294
295
0
    pthread_mutex_unlock(&mt->idx_m);
296
0
    return 0;
297
0
}
298
299
void bgzf_index_destroy(BGZF *fp);
300
int bgzf_index_add_block(BGZF *fp);
301
static int mt_destroy(mtaux_t *mt);
302
303
static inline void packInt16(uint8_t *buffer, uint16_t value)
304
0
{
305
0
    buffer[0] = value;
306
0
    buffer[1] = value >> 8;
307
0
}
308
309
static inline int unpackInt16(const uint8_t *buffer)
310
3.32k
{
311
3.32k
    return buffer[0] | buffer[1] << 8;
312
3.32k
}
313
314
static inline void packInt32(uint8_t *buffer, uint32_t value)
315
0
{
316
0
    buffer[0] = value;
317
0
    buffer[1] = value >> 8;
318
0
    buffer[2] = value >> 16;
319
0
    buffer[3] = value >> 24;
320
0
}
321
322
static void razf_info(hFILE *hfp, const char *filename)
323
1
{
324
1
    uint64_t usize, csize;
325
1
    off_t sizes_pos;
326
327
1
    if (filename == NULL || strcmp(filename, "-") == 0) filename = "FILE";
328
329
    // RAZF files end with USIZE,CSIZE stored as big-endian uint64_t
330
1
    if ((sizes_pos = hseek(hfp, -16, SEEK_END)) < 0) goto no_sizes;
331
1
    if (hread(hfp, &usize, 8) != 8 || hread(hfp, &csize, 8) != 8) goto no_sizes;
332
1
    if (!ed_is_big()) ed_swap_8p(&usize), ed_swap_8p(&csize);
333
1
    if (csize >= sizes_pos) goto no_sizes; // Very basic validity check
334
335
0
    hts_log_error(
336
0
"To decompress this file, use the following commands:\n"
337
0
"    truncate -s %" PRIu64 " %s\n"
338
0
"    gunzip %s\n"
339
0
"The resulting uncompressed file should be %" PRIu64 " bytes in length.\n"
340
0
"If you do not have a truncate command, skip that step (though gunzip will\n"
341
0
"likely produce a \"trailing garbage ignored\" message, which can be ignored).",
342
0
                  csize, filename, filename, usize);
343
0
    return;
344
345
1
no_sizes:
346
1
    hts_log_error(
347
1
"To decompress this file, use the following command:\n"
348
1
"    gunzip %s\n"
349
1
"This will likely produce a \"trailing garbage ignored\" message, which can\n"
350
1
"usually be safely ignored.", filename);
351
1
}
352
353
static const char *bgzf_zerr(int errnum, z_stream *zs)
354
150
{
355
150
    static char buffer[32];
356
357
    /* Return zs->msg if available.
358
       zlib doesn't set this very reliably.  Looking at the source suggests
359
       that it may get set to a useful message for deflateInit2, inflateInit2
360
       and inflate when it returns Z_DATA_ERROR. For inflate with other
361
       return codes, deflate, deflateEnd and inflateEnd it doesn't appear
362
       to be useful.  For the likely non-useful cases, the caller should
363
       pass NULL into zs. */
364
365
150
    if (zs && zs->msg) return zs->msg;
366
367
    // gzerror OF((gzFile file, int *errnum)
368
6
    switch (errnum) {
369
6
    case Z_ERRNO:
370
6
        return strerror(errno);
371
0
    case Z_STREAM_ERROR:
372
0
        return "invalid parameter/compression level, or inconsistent stream state";
373
0
    case Z_DATA_ERROR:
374
0
        return "invalid or incomplete IO";
375
0
    case Z_MEM_ERROR:
376
0
        return "out of memory";
377
0
    case Z_BUF_ERROR:
378
0
        return "progress temporarily not possible, or in() / out() returned an error";
379
0
    case Z_VERSION_ERROR:
380
0
        return "zlib version mismatch";
381
0
    case Z_NEED_DICT:
382
0
        return "data was compressed using a dictionary";
383
0
    case Z_OK: // 0: maybe gzgets error Z_NULL
384
0
    default:
385
0
        snprintf(buffer, sizeof(buffer), "[%d] unknown", errnum);
386
0
        return buffer;  // FIXME: Not thread-safe.
387
6
    }
388
6
}
389
390
static BGZF *bgzf_read_init(hFILE *hfpr, const char *filename)
391
1.19k
{
392
1.19k
    BGZF *fp;
393
1.19k
    uint8_t magic[18];
394
1.19k
    ssize_t n = hpeek(hfpr, magic, 18);
395
1.19k
    if (n < 0) return NULL;
396
397
1.19k
    fp = (BGZF*)calloc(1, sizeof(BGZF));
398
1.19k
    if (fp == NULL) return NULL;
399
400
1.19k
    fp->is_write = 0;
401
1.19k
    fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
402
1.19k
    if (fp->uncompressed_block == NULL) { free(fp); return NULL; }
403
1.19k
    fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
404
1.19k
    fp->is_compressed = (n==18 && magic[0]==0x1f && magic[1]==0x8b);
405
1.19k
    fp->is_gzip = ( !fp->is_compressed || ((magic[3]&4) && memcmp(&magic[12], "BC\2\0",4)==0) ) ? 0 : 1;
406
1.19k
    if (fp->is_compressed && (magic[3]&4) && memcmp(&magic[12], "RAZF", 4)==0) {
407
1
        hts_log_error("Cannot decompress legacy RAZF format");
408
1
        razf_info(hfpr, filename);
409
1
        free(fp->uncompressed_block);
410
1
        free(fp);
411
1
        errno = EFTYPE;
412
1
        return NULL;
413
1
    }
414
1.18k
#ifdef BGZF_CACHE
415
1.18k
    if (!(fp->cache = malloc(sizeof(*fp->cache)))) {
416
0
        free(fp->uncompressed_block);
417
0
        free(fp);
418
0
        return NULL;
419
0
    }
420
1.18k
    if (!(fp->cache->h = kh_init(cache))) {
421
0
        free(fp->uncompressed_block);
422
0
        free(fp->cache);
423
0
        free(fp);
424
0
        return NULL;
425
0
    }
426
1.18k
    fp->cache->last_pos = 0;
427
1.18k
#endif
428
1.18k
    return fp;
429
1.18k
}
430
431
// get the compress level from the mode string: compress_level==-1 for the default level, -2 plain uncompressed
432
static int mode2level(const char *mode)
433
0
{
434
0
    int i, compress_level = -1;
435
0
    for (i = 0; mode[i]; ++i)
436
0
        if (mode[i] >= '0' && mode[i] <= '9') break;
437
0
    if (mode[i]) compress_level = (int)mode[i] - '0';
438
0
    if (strchr(mode, 'u')) compress_level = -2;
439
0
    return compress_level;
440
0
}
441
static BGZF *bgzf_write_init(const char *mode)
442
0
{
443
0
    BGZF *fp;
444
0
    fp = (BGZF*)calloc(1, sizeof(BGZF));
445
0
    if (fp == NULL) goto mem_fail;
446
0
    fp->is_write = 1;
447
0
    int compress_level = mode2level(mode);
448
0
    if ( compress_level==-2 )
449
0
    {
450
0
        fp->is_compressed = 0;
451
0
        return fp;
452
0
    }
453
0
    fp->is_compressed = 1;
454
455
0
    fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
456
0
    if (fp->uncompressed_block == NULL) goto mem_fail;
457
0
    fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
458
459
0
    fp->compress_level = compress_level < 0? Z_DEFAULT_COMPRESSION : compress_level; // Z_DEFAULT_COMPRESSION==-1
460
0
    if (fp->compress_level > 9) fp->compress_level = Z_DEFAULT_COMPRESSION;
461
0
    if ( strchr(mode,'g') )
462
0
    {
463
        // gzip output
464
0
        fp->is_gzip = 1;
465
0
        fp->gz_stream = (z_stream*)calloc(1,sizeof(z_stream));
466
0
        if (fp->gz_stream == NULL) goto mem_fail;
467
0
        fp->gz_stream->zalloc = NULL;
468
0
        fp->gz_stream->zfree  = NULL;
469
0
        fp->gz_stream->msg    = NULL;
470
471
0
        int ret = deflateInit2(fp->gz_stream, fp->compress_level, Z_DEFLATED, 15|16, 8, Z_DEFAULT_STRATEGY);
472
0
        if (ret!=Z_OK) {
473
0
            hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret, fp->gz_stream));
474
0
            goto fail;
475
0
        }
476
0
    }
477
0
    return fp;
478
479
0
mem_fail:
480
0
    hts_log_error("%s", strerror(errno));
481
482
0
fail:
483
0
    if (fp != NULL) {
484
0
        free(fp->uncompressed_block);
485
0
        free(fp->gz_stream);
486
0
        free(fp);
487
0
    }
488
0
    return NULL;
489
0
}
490
491
BGZF *bgzf_open(const char *path, const char *mode)
492
0
{
493
0
    BGZF *fp = 0;
494
0
    if (strchr(mode, 'r')) {
495
0
        hFILE *fpr;
496
0
        if ((fpr = hopen(path, mode)) == 0) return 0;
497
0
        fp = bgzf_read_init(fpr, path);
498
0
        if (fp == 0) { hclose_abruptly(fpr); return NULL; }
499
0
        fp->fp = fpr;
500
0
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
501
0
        hFILE *fpw;
502
0
        if ((fpw = hopen(path, mode)) == 0) return 0;
503
0
        fp = bgzf_write_init(mode);
504
0
        if (fp == NULL) return NULL;
505
0
        fp->fp = fpw;
506
0
    }
507
0
    else { errno = EINVAL; return 0; }
508
509
0
    fp->is_be = ed_is_big();
510
0
    return fp;
511
0
}
512
513
BGZF *bgzf_dopen(int fd, const char *mode)
514
0
{
515
0
    BGZF *fp = 0;
516
0
    if (strchr(mode, 'r')) {
517
0
        hFILE *fpr;
518
0
        if ((fpr = hdopen(fd, mode)) == 0) return 0;
519
0
        fp = bgzf_read_init(fpr, NULL);
520
0
        if (fp == 0) { hclose_abruptly(fpr); return NULL; } // FIXME this closes fd
521
0
        fp->fp = fpr;
522
0
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
523
0
        hFILE *fpw;
524
0
        if ((fpw = hdopen(fd, mode)) == 0) return 0;
525
0
        fp = bgzf_write_init(mode);
526
0
        if (fp == NULL) return NULL;
527
0
        fp->fp = fpw;
528
0
    }
529
0
    else { errno = EINVAL; return 0; }
530
531
0
    fp->is_be = ed_is_big();
532
0
    return fp;
533
0
}
534
535
BGZF *bgzf_hopen(hFILE *hfp, const char *mode)
536
1.19k
{
537
1.19k
    BGZF *fp = NULL;
538
1.19k
    if (strchr(mode, 'r')) {
539
1.19k
        fp = bgzf_read_init(hfp, NULL);
540
1.19k
        if (fp == NULL) return NULL;
541
1.19k
    } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
542
0
        fp = bgzf_write_init(mode);
543
0
        if (fp == NULL) return NULL;
544
0
    }
545
0
    else { errno = EINVAL; return 0; }
546
547
1.18k
    fp->fp = hfp;
548
1.18k
    fp->is_be = ed_is_big();
549
1.18k
    return fp;
550
1.19k
}
551
552
#ifdef HAVE_LIBDEFLATE
553
int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
554
{
555
    if (slen == 0) {
556
        // EOF block
557
        if (*dlen < 28) return -1;
558
        memcpy(_dst, "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", 28);
559
        *dlen = 28;
560
        return 0;
561
    }
562
563
    uint8_t *dst = (uint8_t*)_dst;
564
565
    if (level == 0) {
566
        // Uncompressed data
567
        if (*dlen < slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH) return -1;
568
        dst[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
569
        u16_to_le(slen,  &dst[BLOCK_HEADER_LENGTH+1]); // length
570
        u16_to_le(~slen, &dst[BLOCK_HEADER_LENGTH+3]); // ones-complement length
571
        memcpy(dst + BLOCK_HEADER_LENGTH+5, src, slen);
572
        *dlen = slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
573
574
    } else {
575
        level = level > 0 ? level : 6; // libdeflate doesn't honour -1 as default
576
        // NB levels go up to 12 here.
577
        int lvl_map[] = {0,1,2,3,5,6,7,8,10,12};
578
        level = lvl_map[level>9 ?9 :level];
579
        struct libdeflate_compressor *z = libdeflate_alloc_compressor(level);
580
        if (!z) return -1;
581
582
        // Raw deflate
583
        size_t clen =
584
            libdeflate_deflate_compress(z, src, slen,
585
                                        dst + BLOCK_HEADER_LENGTH,
586
                                        *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH);
587
588
        if (clen <= 0) {
589
            hts_log_error("Call to libdeflate_deflate_compress failed");
590
            libdeflate_free_compressor(z);
591
            return -1;
592
        }
593
594
        *dlen = clen + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
595
596
        libdeflate_free_compressor(z);
597
    }
598
599
    // write the header
600
    memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
601
    packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
602
603
    // write the footer
604
    uint32_t crc = libdeflate_crc32(0, src, slen);
605
    packInt32((uint8_t*)&dst[*dlen - 8], crc);
606
    packInt32((uint8_t*)&dst[*dlen - 4], slen);
607
    return 0;
608
}
609
610
#else
611
612
int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
613
0
{
614
0
    uint32_t crc;
615
0
    z_stream zs;
616
0
    uint8_t *dst = (uint8_t*)_dst;
617
618
0
    if (level == 0) {
619
0
    uncomp:
620
        // Uncompressed data
621
0
        if (*dlen < slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH) return -1;
622
0
        dst[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
623
0
        u16_to_le(slen,  &dst[BLOCK_HEADER_LENGTH+1]); // length
624
0
        u16_to_le(~slen, &dst[BLOCK_HEADER_LENGTH+3]); // ones-complement length
625
0
        memcpy(dst + BLOCK_HEADER_LENGTH+5, src, slen);
626
0
        *dlen = slen+5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
627
0
    } else {
628
        // compress the body
629
0
        zs.zalloc = NULL; zs.zfree = NULL;
630
0
        zs.msg = NULL;
631
0
        zs.next_in  = (Bytef*)src;
632
0
        zs.avail_in = slen;
633
0
        zs.next_out = dst + BLOCK_HEADER_LENGTH;
634
0
        zs.avail_out = *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
635
0
        int ret = deflateInit2(&zs, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); // -15 to disable zlib header/footer
636
0
        if (ret!=Z_OK) {
637
0
            hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret, &zs));
638
0
            return -1;
639
0
        }
640
0
        if ((ret = deflate(&zs, Z_FINISH)) != Z_STREAM_END) {
641
0
            if (ret == Z_OK && zs.avail_out == 0) {
642
0
                deflateEnd(&zs);
643
0
                goto uncomp;
644
0
            } else {
645
0
                hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
646
0
            }
647
0
            return -1;
648
0
        }
649
        // If we used up the entire output buffer, then we either ran out of
650
        // room or we *just* fitted, but either way we may as well store
651
        // uncompressed for faster decode.
652
0
        if (zs.avail_out == 0) {
653
0
            deflateEnd(&zs);
654
0
            goto uncomp;
655
0
        }
656
0
        if ((ret = deflateEnd(&zs)) != Z_OK) {
657
0
            hts_log_error("Call to deflateEnd failed: %s", bgzf_zerr(ret, NULL));
658
0
            return -1;
659
0
        }
660
0
        *dlen = zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
661
0
    }
662
663
    // write the header
664
0
    memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
665
0
    packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
666
    // write the footer
667
0
    crc = crc32(crc32(0L, NULL, 0L), (Bytef*)src, slen);
668
0
    packInt32((uint8_t*)&dst[*dlen - 8], crc);
669
0
    packInt32((uint8_t*)&dst[*dlen - 4], slen);
670
0
    return 0;
671
0
}
672
#endif // HAVE_LIBDEFLATE
673
674
static int bgzf_gzip_compress(BGZF *fp, void *_dst, size_t *dlen, const void *src, size_t slen, int level)
675
0
{
676
0
    uint8_t *dst = (uint8_t*)_dst;
677
0
    z_stream *zs = fp->gz_stream;
678
0
    int flush = slen ? Z_PARTIAL_FLUSH : Z_FINISH;
679
0
    zs->next_in   = (Bytef*)src;
680
0
    zs->avail_in  = slen;
681
0
    zs->next_out  = dst;
682
0
    zs->avail_out = *dlen;
683
0
    int ret = deflate(zs, flush);
684
0
    if (ret == Z_STREAM_ERROR) {
685
0
        hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret, NULL));
686
0
        return -1;
687
0
    }
688
0
    if (zs->avail_in != 0) {
689
0
        hts_log_error("Deflate block too large for output buffer");
690
0
        return -1;
691
0
    }
692
0
    *dlen = *dlen - zs->avail_out;
693
0
    return 0;
694
0
}
695
696
// Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
697
static int deflate_block(BGZF *fp, int block_length)
698
0
{
699
0
    size_t comp_size = BGZF_MAX_BLOCK_SIZE;
700
0
    int ret;
701
0
    if ( !fp->is_gzip )
702
0
        ret = bgzf_compress(fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
703
0
    else
704
0
        ret = bgzf_gzip_compress(fp, fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
705
706
0
    if ( ret != 0 )
707
0
    {
708
0
        hts_log_debug("Compression error %d", ret);
709
0
        fp->errcode |= BGZF_ERR_ZLIB;
710
0
        return -1;
711
0
    }
712
0
    fp->block_offset = 0;
713
0
    return comp_size;
714
0
}
715
716
#ifdef HAVE_LIBDEFLATE
717
718
static int bgzf_uncompress(uint8_t *dst, size_t *dlen,
719
                           const uint8_t *src, size_t slen,
720
                           uint32_t expected_crc) {
721
    struct libdeflate_decompressor *z = libdeflate_alloc_decompressor();
722
    if (!z) {
723
        hts_log_error("Call to libdeflate_alloc_decompressor failed");
724
        return -1;
725
    }
726
727
    int ret = libdeflate_deflate_decompress(z, src, slen, dst, *dlen, dlen);
728
    libdeflate_free_decompressor(z);
729
730
    if (ret != LIBDEFLATE_SUCCESS) {
731
        hts_log_error("Inflate operation failed: %d", ret);
732
        return -1;
733
    }
734
735
    uint32_t crc = libdeflate_crc32(0, (unsigned char *)dst, *dlen);
736
    if (crc != expected_crc) {
737
        hts_log_error("CRC32 checksum mismatch");
738
        return -2;
739
    }
740
741
    return 0;
742
}
743
744
#else
745
746
static int bgzf_uncompress(uint8_t *dst, size_t *dlen,
747
                           const uint8_t *src, size_t slen,
748
1.09k
                           uint32_t expected_crc) {
749
1.09k
    z_stream zs = {
750
1.09k
        .zalloc = NULL,
751
1.09k
        .zfree = NULL,
752
1.09k
        .msg = NULL,
753
1.09k
        .next_in = (Bytef*)src,
754
1.09k
        .avail_in = slen,
755
1.09k
        .next_out = (Bytef*)dst,
756
1.09k
        .avail_out = *dlen
757
1.09k
    };
758
759
1.09k
    int ret = inflateInit2(&zs, -15);
760
1.09k
    if (ret != Z_OK) {
761
0
        hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret, &zs));
762
0
        return -1;
763
0
    }
764
1.09k
    if ((ret = inflate(&zs, Z_FINISH)) != Z_STREAM_END) {
765
1
        hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
766
1
        if ((ret = inflateEnd(&zs)) != Z_OK) {
767
0
            hts_log_warning("Call to inflateEnd failed: %s", bgzf_zerr(ret, NULL));
768
0
        }
769
1
        return -1;
770
1
    }
771
1.09k
    if ((ret = inflateEnd(&zs)) != Z_OK) {
772
0
        hts_log_error("Call to inflateEnd failed: %s", bgzf_zerr(ret, NULL));
773
0
        return -1;
774
0
    }
775
1.09k
    *dlen = *dlen - zs.avail_out;
776
777
1.09k
    uint32_t crc = crc32(crc32(0L, NULL, 0L), (unsigned char *)dst, *dlen);
778
1.09k
    if (crc != expected_crc) {
779
5
        hts_log_error("CRC32 checksum mismatch");
780
5
        return -2;
781
5
    }
782
783
1.09k
    return 0;
784
1.09k
}
785
#endif // HAVE_LIBDEFLATE
786
787
// Inflate the block in fp->compressed_block into fp->uncompressed_block
788
static int inflate_block(BGZF* fp, int block_length)
789
1.09k
{
790
1.09k
    size_t dlen = BGZF_MAX_BLOCK_SIZE;
791
1.09k
    uint32_t crc = le_to_u32((uint8_t *)fp->compressed_block + block_length-8);
792
1.09k
    int ret = bgzf_uncompress(fp->uncompressed_block, &dlen,
793
1.09k
                              (Bytef*)fp->compressed_block + 18,
794
1.09k
                              block_length - 18, crc);
795
1.09k
    if (ret < 0) {
796
6
        if (ret == -2)
797
5
            fp->errcode |= BGZF_ERR_CRC;
798
1
        else
799
1
            fp->errcode |= BGZF_ERR_ZLIB;
800
6
        return -1;
801
6
    }
802
803
1.09k
    return dlen;
804
1.09k
}
805
806
// Decompress the next part of a non-blocked GZIP file.
807
// Return the number of uncompressed bytes read, 0 on EOF, or a negative number on error.
808
// Will fill the output buffer unless the end of the GZIP file is reached.
809
static int inflate_gzip_block(BGZF *fp)
810
110k
{
811
    // we will set this to true when we detect EOF, so we don't bang against the EOF more than once per call
812
110k
    int input_eof = 0;
813
814
    // write to the part of the output buffer after block_offset
815
110k
    fp->gz_stream->next_out = (Bytef*)fp->uncompressed_block + fp->block_offset;
816
110k
    fp->gz_stream->avail_out = BGZF_MAX_BLOCK_SIZE - fp->block_offset;
817
818
221k
    while ( fp->gz_stream->avail_out != 0 ) {
819
        // until we fill the output buffer (or hit EOF)
820
821
112k
        if ( !input_eof && fp->gz_stream->avail_in == 0 ) {
822
            // we are out of input data in the buffer. Get more.
823
2.17k
            fp->gz_stream->next_in = fp->compressed_block;
824
2.17k
            int ret = hread(fp->fp, fp->compressed_block, BGZF_BLOCK_SIZE);
825
2.17k
            if ( ret < 0 ) {
826
                // hread had an error. Pass it on.
827
0
                return ret;
828
0
            }
829
2.17k
            fp->gz_stream->avail_in = ret;
830
2.17k
            if ( fp->gz_stream->avail_in < BGZF_BLOCK_SIZE ) {
831
                // we have reached EOF but the decompressor hasn't necessarily
832
905
                input_eof = 1;
833
905
            }
834
2.17k
        }
835
836
112k
        fp->gz_stream->msg = NULL;
837
        // decompress as much data as we can
838
112k
        int ret = inflate(fp->gz_stream, Z_SYNC_FLUSH);
839
840
112k
        if ( (ret < 0 && ret != Z_BUF_ERROR) || ret == Z_NEED_DICT ) {
841
            // an error occurred, other than running out of space
842
143
            hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? fp->gz_stream : NULL));
843
143
            fp->errcode |= BGZF_ERR_ZLIB;
844
143
            return -1;
845
112k
        } else if ( ret == Z_STREAM_END ) {
846
            // we finished a GZIP member
847
848
            // scratch for peeking to see if the file is over
849
0
            char c;
850
0
            if (fp->gz_stream->avail_in > 0 || hpeek(fp->fp, &c, 1) == 1) {
851
                // there is more data; try and read another GZIP member in the remaining data
852
0
                int reset_ret = inflateReset(fp->gz_stream);
853
0
                if (reset_ret != Z_OK) {
854
0
                    hts_log_error("Call to inflateReset failed: %s", bgzf_zerr(reset_ret, NULL));
855
0
                    fp->errcode |= BGZF_ERR_ZLIB;
856
0
                    return -1;
857
0
                }
858
0
            } else {
859
                // we consumed all the input data and hit Z_STREAM_END
860
                // so stop looping, even if we never fill the output buffer
861
0
                break;
862
0
            }
863
112k
        } else if ( ret == Z_BUF_ERROR && input_eof && fp->gz_stream->avail_out > 0 ) {
864
            // the gzip file has ended prematurely
865
231
            hts_log_error("Gzip file truncated");
866
231
            fp->errcode |= BGZF_ERR_IO;
867
231
            return -1;
868
231
        }
869
112k
    }
870
871
    // when we get here, the buffer is full or there is an EOF after a complete gzip member
872
109k
    return BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
873
110k
}
874
875
// Returns: 0 on success (BGZF header); -1 on non-BGZF GZIP header; -2 on error
876
static int check_header(const uint8_t *header)
877
1.97k
{
878
1.97k
    if ( header[0] != 31 || header[1] != 139 || header[2] != 8 ) return -2;
879
1.96k
    return ((header[3] & 4) != 0
880
1.96k
            && unpackInt16((uint8_t*)&header[10]) == 6
881
1.96k
            && header[12] == 'B' && header[13] == 'C'
882
1.96k
            && unpackInt16((uint8_t*)&header[14]) == 2) ? 0 : -1;
883
1.97k
}
884
885
#ifdef BGZF_CACHE
886
static void free_cache(BGZF *fp)
887
1.18k
{
888
1.18k
    khint_t k;
889
1.18k
    if (fp->is_write) return;
890
1.18k
    khash_t(cache) *h = fp->cache->h;
891
1.18k
    for (k = kh_begin(h); k < kh_end(h); ++k)
892
0
        if (kh_exist(h, k)) free(kh_val(h, k).block);
893
1.18k
    kh_destroy(cache, h);
894
1.18k
    free(fp->cache);
895
1.18k
}
896
897
static int load_block_from_cache(BGZF *fp, int64_t block_address)
898
0
{
899
0
    khint_t k;
900
0
    cache_t *p;
901
902
0
    khash_t(cache) *h = fp->cache->h;
903
0
    k = kh_get(cache, h, block_address);
904
0
    if (k == kh_end(h)) return 0;
905
0
    p = &kh_val(h, k);
906
0
    if (fp->block_length != 0) fp->block_offset = 0;
907
0
    fp->block_address = block_address;
908
0
    fp->block_length = p->size;
909
0
    memcpy(fp->uncompressed_block, p->block, p->size);
910
0
    if ( hseek(fp->fp, p->end_offset, SEEK_SET) < 0 )
911
0
    {
912
        // todo: move the error up
913
0
        hts_log_error("Could not hseek to %" PRId64, p->end_offset);
914
0
        exit(1);
915
0
    }
916
0
    return p->size;
917
0
}
918
919
static void cache_block(BGZF *fp, int size)
920
700
{
921
700
    int ret;
922
700
    khint_t k, k_orig;
923
700
    uint8_t *block = NULL;
924
700
    cache_t *p;
925
    //fprintf(stderr, "Cache block at %llx\n", (int)fp->block_address);
926
700
    khash_t(cache) *h = fp->cache->h;
927
700
    if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return;
928
0
    if (fp->block_length < 0 || fp->block_length > BGZF_MAX_BLOCK_SIZE) return;
929
0
    if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > (uint32_t)fp->cache_size) {
930
        /* Remove uniformly from any position in the hash by a simple
931
         * round-robin approach.  An alternative strategy would be to
932
         * remove the least recently accessed block, but the round-robin
933
         * removal is simpler and is not expected to have a big impact
934
         * on performance */
935
0
        if (fp->cache->last_pos >= kh_end(h)) fp->cache->last_pos = kh_begin(h);
936
0
        k_orig = k = fp->cache->last_pos;
937
0
        if (++k >= kh_end(h)) k = kh_begin(h);
938
0
        while (k != k_orig) {
939
0
            if (kh_exist(h, k))
940
0
                break;
941
0
            if (++k == kh_end(h))
942
0
                k = kh_begin(h);
943
0
        }
944
0
        fp->cache->last_pos = k;
945
946
0
        if (k != k_orig) {
947
0
            block = kh_val(h, k).block;
948
0
            kh_del(cache, h, k);
949
0
        }
950
0
    } else {
951
0
        block = (uint8_t*)malloc(BGZF_MAX_BLOCK_SIZE);
952
0
    }
953
0
    if (!block) return;
954
0
    k = kh_put(cache, h, fp->block_address, &ret);
955
0
    if (ret <= 0) { // kh_put failed, or in there already (shouldn't happen)
956
0
        free(block);
957
0
        return;
958
0
    }
959
0
    p = &kh_val(h, k);
960
0
    p->size = fp->block_length;
961
0
    p->end_offset = fp->block_address + size;
962
0
    p->block = block;
963
0
    memcpy(p->block, fp->uncompressed_block, p->size);
964
0
}
965
#else
966
static void free_cache(BGZF *fp) {}
967
static int load_block_from_cache(BGZF *fp, int64_t block_address) {return 0;}
968
static void cache_block(BGZF *fp, int size) {}
969
#endif
970
971
/*
972
 * Absolute htell in this compressed file.
973
 *
974
 * Do not confuse with the external bgzf_tell macro which returns the virtual
975
 * offset.
976
 */
977
221k
static off_t bgzf_htell(BGZF *fp) {
978
221k
    if (fp->mt) {
979
0
        pthread_mutex_lock(&fp->mt->job_pool_m);
980
0
        off_t pos = fp->block_address + fp->block_clength;
981
0
        pthread_mutex_unlock(&fp->mt->job_pool_m);
982
0
        return pos;
983
221k
    } else {
984
221k
        return htell(fp->fp);
985
221k
    }
986
221k
}
987
988
int bgzf_read_block(BGZF *fp)
989
111k
{
990
111k
    hts_tpool_result *r;
991
992
111k
    if (fp->errcode) return -1;
993
994
111k
    if (fp->mt) {
995
0
    again:
996
0
        if (fp->mt->hit_eof) {
997
            // Further reading at EOF will always return 0
998
0
            fp->block_length = 0;
999
0
            return 0;
1000
0
        }
1001
0
        r = hts_tpool_next_result_wait(fp->mt->out_queue);
1002
0
        bgzf_job *j = r ? (bgzf_job *)hts_tpool_result_data(r) : NULL;
1003
1004
0
        if (!j || j->errcode == BGZF_ERR_MT) {
1005
0
            if (!fp->mt->free_block) {
1006
0
                fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
1007
0
                if (fp->uncompressed_block == NULL) return -1;
1008
0
                fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
1009
0
            } // else it's already allocated with malloc, maybe even in-use.
1010
0
            if (mt_destroy(fp->mt) < 0) {
1011
0
                fp->errcode = BGZF_ERR_IO;
1012
0
            }
1013
0
            fp->mt = NULL;
1014
0
            hts_tpool_delete_result(r, 0);
1015
0
            if (fp->errcode) {
1016
0
                return -1;
1017
0
            }
1018
0
            goto single_threaded;
1019
0
        }
1020
1021
0
        if (j->errcode) {
1022
0
            fp->errcode = j->errcode;
1023
0
            hts_log_error("BGZF decode jobs returned error %d "
1024
0
                          "for block offset %"PRId64,
1025
0
                          j->errcode, j->block_address);
1026
0
            hts_tpool_delete_result(r, 0);
1027
0
            return -1;
1028
0
        }
1029
1030
0
        if (j->hit_eof) {
1031
0
            if (!fp->last_block_eof && !fp->no_eof_block) {
1032
0
                fp->no_eof_block = 1;
1033
0
                hts_log_warning("EOF marker is absent. The input may be truncated");
1034
0
            }
1035
0
            fp->mt->hit_eof = 1;
1036
0
        }
1037
1038
        // Zero length blocks in the middle of a file are (wrongly)
1039
        // considered as EOF by many callers.  We work around this by
1040
        // trying again to see if we hit a genuine EOF.
1041
0
        if (!j->hit_eof && j->uncomp_len == 0) {
1042
0
            fp->last_block_eof = 1;
1043
0
            hts_tpool_delete_result(r, 0);
1044
0
            goto again;
1045
0
        }
1046
1047
        // block_length=0 and block_offset set by bgzf_seek.
1048
0
        if (fp->block_length != 0) fp->block_offset = 0;
1049
0
        if (!j->hit_eof) fp->block_address = j->block_address;
1050
0
        fp->block_clength = j->comp_len;
1051
0
        fp->block_length = j->uncomp_len;
1052
        // bgzf_read() can change fp->block_length
1053
0
        fp->last_block_eof = (fp->block_length == 0);
1054
1055
0
        if ( j->uncomp_len && j->fp->idx_build_otf )
1056
0
        {
1057
0
            bgzf_index_add_block(j->fp);
1058
0
            j->fp->idx->ublock_addr += j->uncomp_len;
1059
0
        }
1060
1061
        // Steal the data block as it's quicker than a memcpy.
1062
        // We just need to make sure we delay the pool free.
1063
0
        if (fp->mt->curr_job) {
1064
0
            pthread_mutex_lock(&fp->mt->job_pool_m);
1065
0
            pool_free(fp->mt->job_pool, fp->mt->curr_job);
1066
0
            pthread_mutex_unlock(&fp->mt->job_pool_m);
1067
0
        }
1068
0
        fp->uncompressed_block = j->uncomp_data;
1069
0
        fp->mt->curr_job = j;
1070
0
        if (fp->mt->free_block) {
1071
0
            free(fp->mt->free_block); // clear up last non-mt block
1072
0
            fp->mt->free_block = NULL;
1073
0
        }
1074
1075
0
        hts_tpool_delete_result(r, 0);
1076
0
        return 0;
1077
0
    }
1078
1079
111k
    uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
1080
111k
    int count, size, block_length, remaining;
1081
1082
111k
 single_threaded:
1083
111k
    size = 0;
1084
1085
111k
    int64_t block_address;
1086
111k
    block_address = bgzf_htell(fp);
1087
1088
    // Reading an uncompressed file
1089
111k
    if ( !fp->is_compressed )
1090
444
    {
1091
444
        count = hread(fp->fp, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
1092
444
        if (count < 0)  // Error
1093
0
        {
1094
0
            hts_log_error("Failed to read uncompressed data "
1095
0
                          "at offset %"PRId64"%s%s",
1096
0
                          block_address, errno ? ": " : "", strerror(errno));
1097
0
            fp->errcode |= BGZF_ERR_IO;
1098
0
            return -1;
1099
0
        }
1100
444
        else if (count == 0)  // EOF
1101
62
        {
1102
62
            fp->block_length = 0;
1103
62
            return 0;
1104
62
        }
1105
382
        if (fp->block_length != 0) fp->block_offset = 0;
1106
382
        fp->block_address = block_address;
1107
382
        fp->block_length = count;
1108
382
        return 0;
1109
444
    }
1110
1111
    // Reading compressed file
1112
110k
    if ( fp->is_gzip && fp->gz_stream ) // is this is an initialized gzip stream?
1113
109k
    {
1114
109k
        count = inflate_gzip_block(fp);
1115
109k
        if ( count<0 )
1116
354
        {
1117
354
            hts_log_error("Reading GZIP stream failed at offset %"PRId64,
1118
354
                          block_address);
1119
354
            fp->errcode |= BGZF_ERR_ZLIB;
1120
354
            return -1;
1121
354
        }
1122
108k
        fp->block_length = count;
1123
108k
        fp->block_address = block_address;
1124
108k
        return 0;
1125
109k
    }
1126
1.58k
    if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
1127
1128
    // loop to skip empty bgzf blocks
1129
1.97k
    while (1)
1130
1.97k
    {
1131
1.97k
        count = hread(fp->fp, header, sizeof(header));
1132
1.97k
        if (count == 0) { // no data read
1133
0
            if (!fp->last_block_eof && !fp->no_eof_block && !fp->is_gzip) {
1134
0
                fp->no_eof_block = 1;
1135
0
                hts_log_warning("EOF marker is absent. The input may be truncated");
1136
0
            }
1137
0
            fp->block_length = 0;
1138
0
            return 0;
1139
0
        }
1140
1.97k
        int ret = 0;
1141
1.97k
        if ( count != sizeof(header) || (ret=check_header(header))==-2 )
1142
6
        {
1143
6
            fp->errcode |= BGZF_ERR_HEADER;
1144
6
            hts_log_error("%s BGZF header at offset %"PRId64,
1145
6
                          ret ? "Invalid" : "Failed to read",
1146
6
                          block_address);
1147
6
            return -1;
1148
6
        }
1149
1.96k
        if ( ret==-1 )
1150
864
        {
1151
            // GZIP, not BGZF
1152
864
            uint8_t *cblock = (uint8_t*)fp->compressed_block;
1153
864
            memcpy(cblock, header, sizeof(header));
1154
864
            count = hread(fp->fp, cblock+sizeof(header), BGZF_BLOCK_SIZE - sizeof(header)) + sizeof(header);
1155
1156
864
            fp->is_gzip = 1;
1157
864
            fp->gz_stream = (z_stream*) calloc(1,sizeof(z_stream));
1158
            // Set up zlib, using a window size of 15, and its built-in GZIP header processing (+16).
1159
864
            int ret = inflateInit2(fp->gz_stream, 15 + 16);
1160
864
            if (ret != Z_OK)
1161
0
            {
1162
0
                hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret, fp->gz_stream));
1163
0
                fp->errcode |= BGZF_ERR_ZLIB;
1164
0
                return -1;
1165
0
            }
1166
864
            fp->gz_stream->avail_in = count;
1167
864
            fp->gz_stream->next_in  = cblock;
1168
864
            count = inflate_gzip_block(fp);
1169
864
            if ( count<0 )
1170
20
            {
1171
20
                hts_log_error("Reading GZIP stream failed at offset %"PRId64,
1172
20
                              block_address);
1173
20
                fp->errcode |= BGZF_ERR_ZLIB;
1174
20
                return -1;
1175
20
            }
1176
844
            fp->block_length = count;
1177
844
            fp->block_address = block_address;
1178
844
            if ( fp->idx_build_otf ) return -1; // cannot build index for gzip
1179
844
            return 0;
1180
844
        }
1181
1.10k
        size = count;
1182
1.10k
        block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1183
1.10k
        if (block_length < BLOCK_HEADER_LENGTH)
1184
0
        {
1185
0
            hts_log_error("Invalid BGZF block length at offset %"PRId64,
1186
0
                          block_address);
1187
0
            fp->errcode |= BGZF_ERR_HEADER;
1188
0
            return -1;
1189
0
        }
1190
1.10k
        compressed_block = (uint8_t*)fp->compressed_block;
1191
1.10k
        memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1192
1.10k
        remaining = block_length - BLOCK_HEADER_LENGTH;
1193
1.10k
        count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1194
1.10k
        if (count != remaining) {
1195
5
            hts_log_error("Failed to read BGZF block data at offset %"PRId64
1196
5
                          " expected %d bytes; hread returned %d",
1197
5
                          block_address, remaining, count);
1198
5
            fp->errcode |= BGZF_ERR_IO;
1199
5
            return -1;
1200
5
        }
1201
1.09k
        size += count;
1202
1.09k
        if ((count = inflate_block(fp, block_length)) < 0) {
1203
6
            hts_log_debug("Inflate block operation failed for "
1204
6
                          "block at offset %"PRId64": %s",
1205
6
                          block_address, bgzf_zerr(count, NULL));
1206
6
            fp->errcode |= BGZF_ERR_ZLIB;
1207
6
            return -1;
1208
6
        }
1209
1.09k
        fp->last_block_eof = (count == 0);
1210
1.09k
        if ( count ) break;     // otherwise an empty bgzf block
1211
393
        block_address = bgzf_htell(fp); // update for new block start
1212
393
    }
1213
700
    if (fp->block_length != 0) fp->block_offset = 0; // Do not reset offset if this read follows a seek.
1214
700
    fp->block_address = block_address;
1215
700
    fp->block_length = count;
1216
700
    if ( fp->idx_build_otf )
1217
0
    {
1218
0
        bgzf_index_add_block(fp);
1219
0
        fp->idx->ublock_addr += count;
1220
0
    }
1221
700
    cache_block(fp, size);
1222
700
    return 0;
1223
1.58k
}
1224
1225
ssize_t bgzf_read(BGZF *fp, void *data, size_t length)
1226
8.48k
{
1227
8.48k
    ssize_t bytes_read = 0;
1228
8.48k
    uint8_t *output = (uint8_t*)data;
1229
8.48k
    if (length <= 0) return 0;
1230
7.92k
    assert(fp->is_write == 0);
1231
16.2k
    while (bytes_read < length) {
1232
8.37k
        int copy_length, available = fp->block_length - fp->block_offset;
1233
8.37k
        uint8_t *buffer;
1234
8.37k
        if (available <= 0) {
1235
786
            int ret = bgzf_read_block(fp);
1236
786
            if (ret != 0) {
1237
26
                hts_log_error("Read block operation failed with error %d after %zd of %zu bytes", fp->errcode, bytes_read, length);
1238
26
                fp->errcode |= BGZF_ERR_ZLIB;
1239
26
                return -1;
1240
26
            }
1241
760
            available = fp->block_length - fp->block_offset;
1242
760
            if (available == 0) {
1243
62
                if (fp->block_length == 0)
1244
62
                    break; // EOF
1245
1246
                // Offset was at end of block (see commit e9863a0)
1247
0
                fp->block_address = bgzf_htell(fp);
1248
0
                fp->block_offset = fp->block_length = 0;
1249
0
                continue;
1250
698
            } else if (available < 0) {
1251
                // Block offset was set to an invalid coordinate
1252
0
                hts_log_error("BGZF block offset %d set beyond block size %d",
1253
0
                              fp->block_offset, fp->block_length);
1254
0
                fp->errcode |= BGZF_ERR_MISUSE;
1255
0
                return -1;
1256
0
            }
1257
760
        }
1258
8.28k
        copy_length = length - bytes_read < available? length - bytes_read : available;
1259
8.28k
        buffer = (uint8_t*)fp->uncompressed_block;
1260
8.28k
        memcpy(output, buffer + fp->block_offset, copy_length);
1261
8.28k
        fp->block_offset += copy_length;
1262
8.28k
        output += copy_length;
1263
8.28k
        bytes_read += copy_length;
1264
1265
        // For raw gzip streams this avoids short reads.
1266
8.28k
        if (fp->block_offset == fp->block_length) {
1267
467
            fp->block_address = bgzf_htell(fp);
1268
467
            fp->block_offset = fp->block_length = 0;
1269
467
        }
1270
8.28k
    }
1271
1272
7.89k
    fp->uncompressed_address += bytes_read;
1273
1274
7.89k
    return bytes_read;
1275
7.92k
}
1276
1277
// -1 for EOF, -2 for error, 0-255 for byte.
1278
8.60M
int bgzf_peek(BGZF *fp) {
1279
8.60M
    int available = fp->block_length - fp->block_offset;
1280
8.60M
    if (available <= 0) {
1281
144
        if (bgzf_read_block(fp) < 0) {
1282
4
            hts_log_error("Read block operation failed with error %d", fp->errcode);
1283
4
            fp->errcode = BGZF_ERR_ZLIB;
1284
4
            return -2;
1285
4
        }
1286
144
    }
1287
8.60M
    available = fp->block_length - fp->block_offset;
1288
8.60M
    if (available)
1289
8.60M
        return ((unsigned char *)fp->uncompressed_block)[fp->block_offset];
1290
1291
0
    return -1;
1292
8.60M
}
1293
1294
ssize_t bgzf_raw_read(BGZF *fp, void *data, size_t length)
1295
0
{
1296
0
    ssize_t ret = hread(fp->fp, data, length);
1297
0
    if (ret < 0) fp->errcode |= BGZF_ERR_IO;
1298
0
    return ret;
1299
0
}
1300
1301
#ifdef BGZF_MT
1302
1303
/* Function to clean up when jobs are discarded (e.g. during seek)
1304
 * This works for results too, as results are the same struct with
1305
 * decompressed data stored in it. */
1306
0
static void job_cleanup(void *arg) {
1307
0
    bgzf_job *j = (bgzf_job *)arg;
1308
0
    mtaux_t *mt = j->fp->mt;
1309
0
    pthread_mutex_lock(&mt->job_pool_m);
1310
0
    pool_free(mt->job_pool, j);
1311
0
    pthread_mutex_unlock(&mt->job_pool_m);
1312
0
}
1313
1314
0
static void *bgzf_encode_func(void *arg) {
1315
0
    bgzf_job *j = (bgzf_job *)arg;
1316
1317
0
    j->comp_len = BGZF_MAX_BLOCK_SIZE;
1318
0
    int ret = bgzf_compress(j->comp_data, &j->comp_len,
1319
0
                            j->uncomp_data, j->uncomp_len,
1320
0
                            j->fp->compress_level);
1321
0
    if (ret != 0)
1322
0
        j->errcode |= BGZF_ERR_ZLIB;
1323
1324
0
    return arg;
1325
0
}
1326
1327
// Optimisation for compression level 0 (uncompressed deflate blocks)
1328
// Avoids memcpy of the data from uncompressed to compressed buffer.
1329
0
static void *bgzf_encode_level0_func(void *arg) {
1330
0
    bgzf_job *j = (bgzf_job *)arg;
1331
0
    uint32_t crc;
1332
0
    j->comp_len = j->uncomp_len + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH + 5;
1333
1334
    // Data will have already been copied in to
1335
    // j->comp_data + BLOCK_HEADER_LENGTH + 5
1336
1337
    // Add preamble
1338
0
    memcpy(j->comp_data, g_magic, BLOCK_HEADER_LENGTH);
1339
0
    u16_to_le(j->comp_len-1, j->comp_data + 16);
1340
1341
    // Deflate uncompressed data header
1342
0
    j->comp_data[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
1343
0
    u16_to_le(j->uncomp_len, j->comp_data + BLOCK_HEADER_LENGTH + 1);
1344
0
    u16_to_le(~j->uncomp_len, j->comp_data + BLOCK_HEADER_LENGTH + 3);
1345
1346
    // Trailer (CRC, uncompressed length)
1347
#ifdef HAVE_LIBDEFLATE
1348
    crc = libdeflate_crc32(0, j->comp_data + BLOCK_HEADER_LENGTH + 5,
1349
                           j->uncomp_len);
1350
#else
1351
0
    crc = crc32(crc32(0L, NULL, 0L),
1352
0
                (Bytef*)j->comp_data + BLOCK_HEADER_LENGTH + 5, j->uncomp_len);
1353
0
#endif
1354
0
    u32_to_le(crc, j->comp_data +  j->comp_len - 8);
1355
0
    u32_to_le(j->uncomp_len, j->comp_data + j->comp_len - 4);
1356
1357
0
    return arg;
1358
0
}
1359
1360
// Our input block has already been decoded by bgzf_mt_read_block().
1361
// We need to split that into a fetch block (compressed) and make this
1362
// do the actual decompression step.
1363
0
static void *bgzf_decode_func(void *arg) {
1364
0
    bgzf_job *j = (bgzf_job *)arg;
1365
1366
0
    j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1367
0
    uint32_t crc = le_to_u32((uint8_t *)j->comp_data + j->comp_len-8);
1368
0
    int ret = bgzf_uncompress(j->uncomp_data, &j->uncomp_len,
1369
0
                              j->comp_data+18, j->comp_len-18, crc);
1370
0
    if (ret != 0)
1371
0
        j->errcode |= BGZF_ERR_ZLIB;
1372
1373
0
    return arg;
1374
0
}
1375
1376
/*
1377
 * Nul function so we can dispatch a job with the correct serial
1378
 * to mark failure or to indicate an empty read (EOF).
1379
 */
1380
0
static void *bgzf_nul_func(void *arg) { return arg; }
1381
1382
/*
1383
 * Takes compressed blocks off the results queue and calls hwrite to
1384
 * punt them to the output stream.
1385
 *
1386
 * Returns NULL when no more are left, or -1 on error
1387
 */
1388
0
static void *bgzf_mt_writer(void *vp) {
1389
0
    BGZF *fp = (BGZF *)vp;
1390
0
    mtaux_t *mt = fp->mt;
1391
0
    hts_tpool_result *r;
1392
1393
0
    if (fp->idx_build_otf) {
1394
0
        fp->idx->moffs = fp->idx->noffs = 1;
1395
0
        fp->idx->offs = (bgzidx1_t*) calloc(fp->idx->moffs, sizeof(bgzidx1_t));
1396
0
        if (!fp->idx->offs) goto err;
1397
0
    }
1398
1399
    // Iterates until result queue is shutdown, where it returns NULL.
1400
0
    while ((r = hts_tpool_next_result_wait(mt->out_queue))) {
1401
0
        bgzf_job *j = (bgzf_job *)hts_tpool_result_data(r);
1402
0
        assert(j);
1403
1404
0
        if (fp->idx_build_otf) {
1405
0
            fp->idx->noffs++;
1406
0
            if ( fp->idx->noffs > fp->idx->moffs )
1407
0
            {
1408
0
                fp->idx->moffs = fp->idx->noffs;
1409
0
                kroundup32(fp->idx->moffs);
1410
0
                fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
1411
0
                if ( !fp->idx->offs ) goto err;
1412
0
            }
1413
0
            fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->offs[ fp->idx->noffs-2 ].uaddr + j->uncomp_len;
1414
0
            fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->idx->offs[ fp->idx->noffs-2 ].caddr + j->comp_len;
1415
0
        }
1416
1417
        // Flush any cached hts_idx_push calls
1418
0
        if (bgzf_idx_flush(fp) < 0)
1419
0
            goto err;
1420
1421
0
        if (hwrite(fp->fp, j->comp_data, j->comp_len) != j->comp_len)
1422
0
            goto err;
1423
1424
        // Update our local block_address.  Cannot be fp->block_address due to no
1425
        // locking in bgzf_tell.
1426
0
        pthread_mutex_lock(&mt->idx_m);
1427
0
        mt->block_address += j->comp_len;
1428
0
        pthread_mutex_unlock(&mt->idx_m);
1429
1430
        /*
1431
         * Periodically call hflush (which calls fsync when on a file).
1432
         * This avoids the fsync being done at the bgzf_close stage,
1433
         * which can sometimes cause significant delays.  As this is in
1434
         * a separate thread, spreading the sync delays throughout the
1435
         * program execution seems better.
1436
         * Frequency of 1/512 has been chosen by experimentation
1437
         * across local XFS, NFS and Lustre tests.
1438
         */
1439
0
        if (++mt->flush_pending % 512 == 0)
1440
0
            if (hflush(fp->fp) != 0)
1441
0
                goto err;
1442
1443
1444
0
        hts_tpool_delete_result(r, 0);
1445
1446
        // Also updated by main thread
1447
0
        pthread_mutex_lock(&mt->job_pool_m);
1448
0
        pool_free(mt->job_pool, j);
1449
0
        mt->jobs_pending--;
1450
0
        pthread_mutex_unlock(&mt->job_pool_m);
1451
0
    }
1452
1453
0
    if (hflush(fp->fp) != 0)
1454
0
        goto err;
1455
1456
0
    hts_tpool_process_destroy(mt->out_queue);
1457
1458
0
    return NULL;
1459
1460
0
 err:
1461
0
    hts_tpool_process_destroy(mt->out_queue);
1462
0
    return (void *)-1;
1463
0
}
1464
1465
1466
/*
1467
 * Reads a compressed block of data using hread and dispatches it to
1468
 * the thread pool for decompression.  This is the analogue of the old
1469
 * non-threaded bgzf_read_block() function, but without modifying fp
1470
 * in any way (except for the read offset).  All output goes via the
1471
 * supplied bgzf_job struct.
1472
 *
1473
 * Returns NULL when no more are left, or -1 on error
1474
 */
1475
int bgzf_mt_read_block(BGZF *fp, bgzf_job *j)
1476
0
{
1477
0
    uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
1478
0
    int count, block_length, remaining;
1479
1480
    // NOTE: Guaranteed to be compressed as we block multi-threading in
1481
    // uncompressed mode.  However it may be gzip compression instead
1482
    // of bgzf.
1483
1484
    // Reading compressed file
1485
0
    int64_t block_address;
1486
0
    block_address = htell(fp->fp);
1487
1488
0
    j->block_address = block_address;  // in case we exit with j->errcode
1489
1490
0
    if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
1491
0
    count = hpeek(fp->fp, header, sizeof(header));
1492
0
    if (count == 0) // no data read
1493
0
        return -1;
1494
0
    int ret;
1495
0
    if ( count != sizeof(header) || (ret=check_header(header))==-2 )
1496
0
    {
1497
0
        j->errcode |= BGZF_ERR_HEADER;
1498
0
        return -1;
1499
0
    }
1500
0
    if (ret == -1) {
1501
0
        j->errcode |= BGZF_ERR_MT;
1502
0
        return -1;
1503
0
    }
1504
1505
0
    count = hread(fp->fp, header, sizeof(header));
1506
0
    if (count != sizeof(header)) // no data read
1507
0
        return -1;
1508
1509
0
    block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1510
0
    if (block_length < BLOCK_HEADER_LENGTH) {
1511
0
        j->errcode |= BGZF_ERR_HEADER;
1512
0
        return -1;
1513
0
    }
1514
0
    compressed_block = (uint8_t*)j->comp_data;
1515
0
    memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1516
0
    remaining = block_length - BLOCK_HEADER_LENGTH;
1517
0
    count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1518
0
    if (count != remaining) {
1519
0
        j->errcode |= BGZF_ERR_IO;
1520
0
        return -1;
1521
0
    }
1522
0
    j->comp_len = block_length;
1523
0
    j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1524
0
    j->block_address = block_address;
1525
0
    j->fp = fp;
1526
0
    j->errcode = 0;
1527
1528
0
    return 0;
1529
0
}
1530
1531
1532
static int bgzf_check_EOF_common(BGZF *fp)
1533
139
{
1534
139
    uint8_t buf[28];
1535
139
    off_t offset = htell(fp->fp);
1536
139
    if (hseek(fp->fp, -28, SEEK_END) < 0) {
1537
0
        if (errno == ESPIPE) { hclearerr(fp->fp); return 2; }
1538
#ifdef _WIN32
1539
        if (errno == EINVAL) { hclearerr(fp->fp); return 2; }
1540
#else
1541
        // Assume that EINVAL was due to the file being less than 28 bytes
1542
        // long, rather than being a random error return from an hfile backend.
1543
        // This should be reported as "no EOF block" rather than an error.
1544
0
        if (errno == EINVAL) { hclearerr(fp->fp); return 0; }
1545
0
#endif
1546
0
        return -1;
1547
0
    }
1548
139
    if ( hread(fp->fp, buf, 28) != 28 ) return -1;
1549
139
    if ( hseek(fp->fp, offset, SEEK_SET) < 0 ) return -1;
1550
139
    return (memcmp("\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", buf, 28) == 0)? 1 : 0;
1551
139
}
1552
1553
/*
1554
 * Checks EOF from the reader thread.
1555
 */
1556
0
static void bgzf_mt_eof(BGZF *fp) {
1557
0
    mtaux_t *mt = fp->mt;
1558
1559
0
    pthread_mutex_lock(&mt->job_pool_m);
1560
0
    mt->eof = bgzf_check_EOF_common(fp);
1561
0
    pthread_mutex_unlock(&mt->job_pool_m);
1562
0
    mt->command = HAS_EOF_DONE;
1563
0
    pthread_cond_signal(&mt->command_c);
1564
0
}
1565
1566
1567
/*
1568
 * Performs the seek (called by reader thread).
1569
 *
1570
 * This simply drains the entire queue, throwing away blocks, seeks,
1571
 * and starts it up again.  Brute force, but maybe sufficient.
1572
 */
1573
0
static void bgzf_mt_seek(BGZF *fp) {
1574
0
    mtaux_t *mt = fp->mt;
1575
1576
0
    hts_tpool_process_reset(mt->out_queue, 0);
1577
0
    pthread_mutex_lock(&mt->job_pool_m);
1578
0
    mt->errcode = 0;
1579
1580
0
    if (hseek(fp->fp, mt->block_address, SEEK_SET) < 0)
1581
0
        mt->errcode = BGZF_ERR_IO;
1582
1583
0
    pthread_mutex_unlock(&mt->job_pool_m);
1584
0
    mt->command = SEEK_DONE;
1585
0
    pthread_cond_signal(&mt->command_c);
1586
0
}
1587
1588
0
static void *bgzf_mt_reader(void *vp) {
1589
0
    BGZF *fp = (BGZF *)vp;
1590
0
    mtaux_t *mt = fp->mt;
1591
1592
0
restart:
1593
0
    pthread_mutex_lock(&mt->job_pool_m);
1594
0
    bgzf_job *j = pool_alloc(mt->job_pool);
1595
0
    pthread_mutex_unlock(&mt->job_pool_m);
1596
0
    if (!j) goto err;
1597
0
    j->errcode = 0;
1598
0
    j->comp_len = 0;
1599
0
    j->uncomp_len = 0;
1600
0
    j->hit_eof = 0;
1601
0
    j->fp = fp;
1602
1603
0
    while (bgzf_mt_read_block(fp, j) == 0) {
1604
        // Dispatch
1605
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_decode_func, j,
1606
0
                                job_cleanup, job_cleanup, 0) < 0) {
1607
0
            job_cleanup(j);
1608
0
            goto err;
1609
0
        }
1610
1611
        // Check for command
1612
0
        pthread_mutex_lock(&mt->command_m);
1613
0
        switch (mt->command) {
1614
0
        case SEEK:
1615
0
            bgzf_mt_seek(fp);  // Sets mt->command to SEEK_DONE
1616
0
            pthread_mutex_unlock(&mt->command_m);
1617
0
            goto restart;
1618
1619
0
        case HAS_EOF:
1620
0
            bgzf_mt_eof(fp);   // Sets mt->command to HAS_EOF_DONE
1621
0
            break;
1622
1623
0
        case SEEK_DONE:
1624
0
        case HAS_EOF_DONE:
1625
0
            pthread_cond_signal(&mt->command_c);
1626
0
            break;
1627
1628
0
        case CLOSE:
1629
0
            pthread_cond_signal(&mt->command_c);
1630
0
            pthread_mutex_unlock(&mt->command_m);
1631
0
            hts_tpool_process_destroy(mt->out_queue);
1632
0
            return NULL;
1633
1634
0
        default:
1635
0
            break;
1636
0
        }
1637
0
        pthread_mutex_unlock(&mt->command_m);
1638
1639
        // Allocate buffer for next block
1640
0
        pthread_mutex_lock(&mt->job_pool_m);
1641
0
        j = pool_alloc(mt->job_pool);
1642
0
        pthread_mutex_unlock(&mt->job_pool_m);
1643
0
        if (!j) {
1644
0
            hts_tpool_process_destroy(mt->out_queue);
1645
0
            return NULL;
1646
0
        }
1647
0
        j->errcode = 0;
1648
0
        j->comp_len = 0;
1649
0
        j->uncomp_len = 0;
1650
0
        j->hit_eof = 0;
1651
0
        j->fp = fp;
1652
0
    }
1653
1654
0
    if (j->errcode == BGZF_ERR_MT) {
1655
        // Attempt to multi-thread decode a raw gzip stream cannot be done.
1656
        // We tear down the multi-threaded decoder and revert to the old code.
1657
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_nul_func, j,
1658
0
                                job_cleanup, job_cleanup, 0) < 0) {
1659
0
            job_cleanup(j);
1660
0
            hts_tpool_process_destroy(mt->out_queue);
1661
0
            return NULL;
1662
0
        }
1663
0
        hts_tpool_process_ref_decr(mt->out_queue);
1664
0
        return &j->errcode;
1665
0
    }
1666
1667
    // Dispatch an empty block so EOF is spotted.
1668
    // We also use this mechanism for returning errors, in which case
1669
    // j->errcode is set already.
1670
1671
0
    j->hit_eof = 1;
1672
0
    if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_nul_func, j,
1673
0
                            job_cleanup, job_cleanup, 0) < 0) {
1674
0
        job_cleanup(j);
1675
0
        hts_tpool_process_destroy(mt->out_queue);
1676
0
        return NULL;
1677
0
    }
1678
0
    if (j->errcode != 0) {
1679
0
        hts_tpool_process_destroy(mt->out_queue);
1680
0
        return &j->errcode;
1681
0
    }
1682
1683
    // We hit EOF so can stop reading, but we may get a subsequent
1684
    // seek request.  In this case we need to restart the reader.
1685
    //
1686
    // To handle this we wait on a condition variable and then
1687
    // monitor the command. (This could be either seek or close.)
1688
0
    for (;;) {
1689
0
        pthread_mutex_lock(&mt->command_m);
1690
0
        if (mt->command == NONE)
1691
0
            pthread_cond_wait(&mt->command_c, &mt->command_m);
1692
0
        switch(mt->command) {
1693
0
        default:
1694
0
            pthread_mutex_unlock(&mt->command_m);
1695
0
            break;
1696
1697
0
        case SEEK:
1698
0
            bgzf_mt_seek(fp);
1699
0
            pthread_mutex_unlock(&mt->command_m);
1700
0
            goto restart;
1701
1702
0
        case HAS_EOF:
1703
0
            bgzf_mt_eof(fp);   // Sets mt->command to HAS_EOF_DONE
1704
0
            pthread_mutex_unlock(&mt->command_m);
1705
0
            break;
1706
1707
0
        case SEEK_DONE:
1708
0
        case HAS_EOF_DONE:
1709
0
            pthread_cond_signal(&mt->command_c);
1710
0
            pthread_mutex_unlock(&mt->command_m);
1711
0
            break;
1712
1713
0
        case CLOSE:
1714
0
            pthread_cond_signal(&mt->command_c);
1715
0
            pthread_mutex_unlock(&mt->command_m);
1716
0
            hts_tpool_process_destroy(mt->out_queue);
1717
0
            return NULL;
1718
0
        }
1719
0
    }
1720
1721
0
 err:
1722
0
    pthread_mutex_lock(&mt->command_m);
1723
0
    mt->command = CLOSE;
1724
0
    pthread_cond_signal(&mt->command_c);
1725
0
    pthread_mutex_unlock(&mt->command_m);
1726
0
    hts_tpool_process_destroy(mt->out_queue);
1727
0
    return NULL;
1728
0
}
1729
1730
0
int bgzf_thread_pool(BGZF *fp, hts_tpool *pool, int qsize) {
1731
    // No gain from multi-threading when not compressed
1732
0
    if (!fp->is_compressed)
1733
0
        return 0;
1734
1735
0
    mtaux_t *mt;
1736
0
    mt = (mtaux_t*)calloc(1, sizeof(mtaux_t));
1737
0
    if (!mt) return -1;
1738
0
    fp->mt = mt;
1739
1740
0
    mt->pool = pool;
1741
0
    mt->n_threads = hts_tpool_size(pool);
1742
0
    if (!qsize)
1743
0
        qsize = mt->n_threads*2;
1744
0
    if (!(mt->out_queue = hts_tpool_process_init(mt->pool, qsize, 0)))
1745
0
        goto err;
1746
0
    hts_tpool_process_ref_incr(mt->out_queue);
1747
1748
0
    mt->job_pool = pool_create(sizeof(bgzf_job));
1749
0
    if (!mt->job_pool)
1750
0
        goto err;
1751
1752
0
    pthread_mutex_init(&mt->job_pool_m, NULL);
1753
0
    pthread_mutex_init(&mt->command_m, NULL);
1754
0
    pthread_mutex_init(&mt->idx_m, NULL);
1755
0
    pthread_cond_init(&mt->command_c, NULL);
1756
0
    mt->flush_pending = 0;
1757
0
    mt->jobs_pending = 0;
1758
0
    mt->free_block = fp->uncompressed_block; // currently in-use block
1759
0
    mt->block_address = fp->block_address;
1760
0
    pthread_create(&mt->io_task, NULL,
1761
0
                   fp->is_write ? bgzf_mt_writer : bgzf_mt_reader, fp);
1762
1763
0
    return 0;
1764
1765
0
 err:
1766
0
    free(mt);
1767
0
    fp->mt = NULL;
1768
0
    return -1;
1769
0
}
1770
1771
int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1772
0
{
1773
    // No gain from multi-threading when not compressed
1774
0
    if (!fp->is_compressed || fp->is_gzip)
1775
0
        return 0;
1776
1777
0
    if (n_threads < 1) return -1;
1778
0
    hts_tpool *p = hts_tpool_init(n_threads);
1779
0
    if (!p)
1780
0
        return -1;
1781
1782
0
    if (bgzf_thread_pool(fp, p, 0) != 0) {
1783
0
        hts_tpool_destroy(p);
1784
0
        return -1;
1785
0
    }
1786
1787
0
    fp->mt->own_pool = 1;
1788
1789
0
    return 0;
1790
0
}
1791
1792
static int mt_destroy(mtaux_t *mt)
1793
0
{
1794
0
    int ret = 0;
1795
1796
    // Tell the reader to shut down
1797
0
    pthread_mutex_lock(&mt->command_m);
1798
0
    mt->command = CLOSE;
1799
0
    pthread_cond_signal(&mt->command_c);
1800
0
    hts_tpool_wake_dispatch(mt->out_queue); // unstick the reader
1801
0
    pthread_mutex_unlock(&mt->command_m);
1802
1803
    // Check for thread worker failure, indicated by is_shutdown returning 2
1804
    // It's possible really late errors might be missed, but we can live with
1805
    // that.
1806
0
    ret = -(hts_tpool_process_is_shutdown(mt->out_queue) > 1);
1807
    // Destroying the queue first forces the writer to exit.
1808
    // mt->out_queue is reference counted, so destroy gets called in both
1809
    // this and the IO threads.  The last to do it will clean up.
1810
0
    hts_tpool_process_destroy(mt->out_queue);
1811
1812
    // IO thread will now exit.  Wait for it and perform final clean-up.
1813
    // If it returned non-NULL, it was not happy.
1814
0
    void *retval = NULL;
1815
0
    pthread_join(mt->io_task, &retval);
1816
0
    ret = retval != NULL ? -1 : ret;
1817
1818
0
    pthread_mutex_destroy(&mt->job_pool_m);
1819
0
    pthread_mutex_destroy(&mt->command_m);
1820
0
    pthread_mutex_destroy(&mt->idx_m);
1821
0
    pthread_cond_destroy(&mt->command_c);
1822
0
    if (mt->curr_job)
1823
0
        pool_free(mt->job_pool, mt->curr_job);
1824
1825
0
    if (mt->own_pool)
1826
0
        hts_tpool_destroy(mt->pool);
1827
1828
0
    pool_destroy(mt->job_pool);
1829
1830
0
    if (mt->idx_cache.e)
1831
0
        free(mt->idx_cache.e);
1832
1833
0
    free(mt);
1834
0
    fflush(stderr);
1835
1836
0
    return ret;
1837
0
}
1838
1839
static int mt_queue(BGZF *fp)
1840
0
{
1841
0
    mtaux_t *mt = fp->mt;
1842
1843
0
    mt->block_number++;
1844
1845
    // Also updated by writer thread
1846
0
    pthread_mutex_lock(&mt->job_pool_m);
1847
0
    bgzf_job *j = pool_alloc(mt->job_pool);
1848
0
    if (j) mt->jobs_pending++;
1849
0
    pthread_mutex_unlock(&mt->job_pool_m);
1850
0
    if (!j) return -1;
1851
1852
0
    j->fp = fp;
1853
0
    j->errcode = 0;
1854
0
    j->uncomp_len  = fp->block_offset;
1855
0
    if (fp->compress_level == 0) {
1856
0
        memcpy(j->comp_data + BLOCK_HEADER_LENGTH + 5, fp->uncompressed_block,
1857
0
               j->uncomp_len);
1858
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue,
1859
0
                                bgzf_encode_level0_func, j,
1860
0
                                job_cleanup, job_cleanup, 0) < 0) {
1861
0
            goto fail;
1862
0
        }
1863
0
    } else {
1864
0
        memcpy(j->uncomp_data, fp->uncompressed_block, j->uncomp_len);
1865
1866
        // Need non-block vers & job_pending?
1867
0
        if (hts_tpool_dispatch3(mt->pool, mt->out_queue, bgzf_encode_func, j,
1868
0
                                job_cleanup, job_cleanup, 0) < 0) {
1869
0
            goto fail;
1870
0
        }
1871
0
    }
1872
1873
0
    fp->block_offset = 0;
1874
0
    return 0;
1875
1876
0
 fail:
1877
0
    job_cleanup(j);
1878
0
    pthread_mutex_lock(&mt->job_pool_m);
1879
0
    mt->jobs_pending--;
1880
0
    pthread_mutex_unlock(&mt->job_pool_m);
1881
0
    return -1;
1882
0
}
1883
1884
static int mt_flush_queue(BGZF *fp)
1885
0
{
1886
0
    mtaux_t *mt = fp->mt;
1887
1888
    // Drain the encoder jobs.
1889
    // We cannot use hts_tpool_flush here as it can cause deadlock if
1890
    // the queue is full up of decoder tasks.  The best solution would
1891
    // be to have one input queue per type of job, but we don't right now.
1892
    //hts_tpool_flush(mt->pool);
1893
0
    pthread_mutex_lock(&mt->job_pool_m);
1894
0
    int shutdown = 0;
1895
0
    while (mt->jobs_pending != 0) {
1896
0
        if ((shutdown = hts_tpool_process_is_shutdown(mt->out_queue)))
1897
0
            break;
1898
0
        pthread_mutex_unlock(&mt->job_pool_m);
1899
0
        usleep(10000); // FIXME: replace by condition variable
1900
0
        pthread_mutex_lock(&mt->job_pool_m);
1901
0
    }
1902
0
    pthread_mutex_unlock(&mt->job_pool_m);
1903
1904
0
    if (shutdown)
1905
0
        return -1;
1906
1907
    // Wait on bgzf_mt_writer to drain the queue
1908
0
    if (hts_tpool_process_flush(mt->out_queue) != 0)
1909
0
        return -1;
1910
1911
0
    return (fp->errcode == 0)? 0 : -1;
1912
0
}
1913
1914
static int lazy_flush(BGZF *fp)
1915
0
{
1916
0
    if (fp->mt)
1917
0
        return fp->block_offset ? mt_queue(fp) : 0;
1918
0
    else
1919
0
        return bgzf_flush(fp);
1920
0
}
1921
1922
#else  // ~ #ifdef BGZF_MT
1923
1924
int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1925
{
1926
    return 0;
1927
}
1928
1929
static inline int lazy_flush(BGZF *fp)
1930
{
1931
    return bgzf_flush(fp);
1932
}
1933
1934
#endif // ~ #ifdef BGZF_MT
1935
1936
int bgzf_flush(BGZF *fp)
1937
0
{
1938
0
    if (!fp->is_write) return 0;
1939
0
#ifdef BGZF_MT
1940
0
    if (fp->mt) {
1941
0
        int ret = 0;
1942
0
        if (fp->block_offset) ret = mt_queue(fp);
1943
0
        if (!ret) ret = mt_flush_queue(fp);
1944
1945
        // We maintain mt->block_address when threading as the
1946
        // main code can call bgzf_tell without any locks.
1947
        // (The result from tell are wrong, but we only care about the last
1948
        // 16-bits worth except for the final flush process.
1949
0
        pthread_mutex_lock(&fp->mt->idx_m);
1950
0
        fp->block_address = fp->mt->block_address;
1951
0
        pthread_mutex_unlock(&fp->mt->idx_m);
1952
1953
0
        return ret;
1954
0
    }
1955
0
#endif
1956
0
    while (fp->block_offset > 0) {
1957
0
        int block_length;
1958
0
        if ( fp->idx_build_otf )
1959
0
        {
1960
0
            bgzf_index_add_block(fp);
1961
0
            fp->idx->ublock_addr += fp->block_offset;
1962
0
        }
1963
0
        block_length = deflate_block(fp, fp->block_offset);
1964
0
        if (block_length < 0) {
1965
0
            hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length, NULL));
1966
0
            return -1;
1967
0
        }
1968
0
        if (hwrite(fp->fp, fp->compressed_block, block_length) != block_length) {
1969
0
            hts_log_error("File write failed (wrong size)");
1970
0
            fp->errcode |= BGZF_ERR_IO; // possibly truncated file
1971
0
            return -1;
1972
0
        }
1973
0
        fp->block_address += block_length;
1974
0
    }
1975
0
    return 0;
1976
0
}
1977
1978
int bgzf_flush_try(BGZF *fp, ssize_t size)
1979
0
{
1980
0
    if (fp->block_offset + size > BGZF_BLOCK_SIZE) return lazy_flush(fp);
1981
0
    return 0;
1982
0
}
1983
1984
ssize_t bgzf_write(BGZF *fp, const void *data, size_t length)
1985
0
{
1986
0
    if ( !fp->is_compressed ) {
1987
0
        size_t push = length + (size_t) fp->block_offset;
1988
0
        fp->block_offset = push % BGZF_MAX_BLOCK_SIZE;
1989
0
        fp->block_address += (push - fp->block_offset);
1990
0
        return hwrite(fp->fp, data, length);
1991
0
    }
1992
1993
0
    const uint8_t *input = (const uint8_t*)data;
1994
0
    ssize_t remaining = length;
1995
0
    assert(fp->is_write);
1996
0
    while (remaining > 0) {
1997
0
        uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
1998
0
        int copy_length = BGZF_BLOCK_SIZE - fp->block_offset;
1999
0
        if (copy_length > remaining) copy_length = remaining;
2000
0
        memcpy(buffer + fp->block_offset, input, copy_length);
2001
0
        fp->block_offset += copy_length;
2002
0
        input += copy_length;
2003
0
        remaining -= copy_length;
2004
0
        if (fp->block_offset == BGZF_BLOCK_SIZE) {
2005
0
            if (lazy_flush(fp) != 0) return -1;
2006
0
        }
2007
0
    }
2008
0
    return length - remaining;
2009
0
}
2010
2011
ssize_t bgzf_block_write(BGZF *fp, const void *data, size_t length)
2012
0
{
2013
0
    if ( !fp->is_compressed ) {
2014
0
        size_t push = length + (size_t) fp->block_offset;
2015
0
        fp->block_offset = push % BGZF_MAX_BLOCK_SIZE;
2016
0
        fp->block_address += (push - fp->block_offset);
2017
0
        return hwrite(fp->fp, data, length);
2018
0
    }
2019
2020
0
    const uint8_t *input = (const uint8_t*)data;
2021
0
    ssize_t remaining = length;
2022
0
    assert(fp->is_write);
2023
0
    uint64_t current_block; //keep track of current block
2024
0
    uint64_t ublock_size; // amount of uncompressed data to be fed into next block
2025
0
    while (remaining > 0) {
2026
0
        current_block = fp->idx->moffs - fp->idx->noffs;
2027
0
        ublock_size = current_block + 1 < fp->idx->moffs ? fp->idx->offs[current_block+1].uaddr-fp->idx->offs[current_block].uaddr : BGZF_MAX_BLOCK_SIZE;
2028
0
        uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
2029
0
        int copy_length = ublock_size - fp->block_offset;
2030
0
        if (copy_length > remaining) copy_length = remaining;
2031
0
        memcpy(buffer + fp->block_offset, input, copy_length);
2032
0
        fp->block_offset += copy_length;
2033
0
        input += copy_length;
2034
0
        remaining -= copy_length;
2035
0
        if (fp->block_offset == ublock_size) {
2036
0
            if (lazy_flush(fp) != 0) return -1;
2037
0
            if (fp->idx->noffs > 0)
2038
0
                fp->idx->noffs--;  // decrement noffs to track the blocks
2039
0
        }
2040
0
    }
2041
0
    return length - remaining;
2042
0
}
2043
2044
2045
ssize_t bgzf_raw_write(BGZF *fp, const void *data, size_t length)
2046
0
{
2047
0
    ssize_t ret = hwrite(fp->fp, data, length);
2048
0
    if (ret < 0) fp->errcode |= BGZF_ERR_IO;
2049
0
    return ret;
2050
0
}
2051
2052
// Helper function for tidying up fp->mt and setting errcode
2053
1.18k
static void bgzf_close_mt(BGZF *fp) {
2054
1.18k
    if (fp->mt) {
2055
0
        if (!fp->mt->free_block)
2056
0
            fp->uncompressed_block = NULL;
2057
0
        if (mt_destroy(fp->mt) < 0)
2058
0
            fp->errcode = BGZF_ERR_IO;
2059
0
    }
2060
1.18k
}
2061
2062
int bgzf_close(BGZF* fp)
2063
1.18k
{
2064
1.18k
    int ret, block_length;
2065
1.18k
    if (fp == 0) return -1;
2066
1.18k
    if (fp->is_write && fp->is_compressed) {
2067
0
        if (bgzf_flush(fp) != 0) {
2068
0
            bgzf_close_mt(fp);
2069
0
            return -1;
2070
0
        }
2071
0
        fp->compress_level = -1;
2072
0
        block_length = deflate_block(fp, 0); // write an empty block
2073
0
        if (block_length < 0) {
2074
0
            hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length, NULL));
2075
0
            bgzf_close_mt(fp);
2076
0
            return -1;
2077
0
        }
2078
0
        if (hwrite(fp->fp, fp->compressed_block, block_length) < 0
2079
0
            || hflush(fp->fp) != 0) {
2080
0
            hts_log_error("File write failed");
2081
0
            fp->errcode |= BGZF_ERR_IO;
2082
0
            return -1;
2083
0
        }
2084
0
    }
2085
2086
1.18k
    bgzf_close_mt(fp);
2087
2088
1.18k
    if ( fp->is_gzip )
2089
864
    {
2090
864
        if (fp->gz_stream == NULL) ret = Z_OK;
2091
864
        else if (!fp->is_write) ret = inflateEnd(fp->gz_stream);
2092
0
        else ret = deflateEnd(fp->gz_stream);
2093
864
        if (ret != Z_OK) {
2094
0
            hts_log_error("Call to inflateEnd/deflateEnd failed: %s", bgzf_zerr(ret, NULL));
2095
0
        }
2096
864
        free(fp->gz_stream);
2097
864
    }
2098
1.18k
    ret = hclose(fp->fp);
2099
1.18k
    if (ret != 0) return -1;
2100
1.18k
    bgzf_index_destroy(fp);
2101
1.18k
    free(fp->uncompressed_block);
2102
1.18k
    free_cache(fp);
2103
1.18k
    ret = fp->errcode ? -1 : 0;
2104
1.18k
    free(fp);
2105
1.18k
    return ret;
2106
1.18k
}
2107
2108
void bgzf_set_cache_size(BGZF *fp, int cache_size)
2109
0
{
2110
0
    if (fp && fp->mt) return; // Not appropriate when multi-threading
2111
0
    if (fp && fp->cache) fp->cache_size = cache_size;
2112
0
}
2113
2114
139
int bgzf_check_EOF(BGZF *fp) {
2115
139
    int has_eof;
2116
2117
139
    if (fp->mt) {
2118
0
        pthread_mutex_lock(&fp->mt->command_m);
2119
        // fp->mt->command state transitions should be:
2120
        // NONE -> HAS_EOF -> HAS_EOF_DONE -> NONE
2121
        // (HAS_EOF -> HAS_EOF_DONE happens in bgzf_mt_reader thread)
2122
0
        if (fp->mt->command != CLOSE)
2123
0
            fp->mt->command = HAS_EOF;
2124
0
        pthread_cond_signal(&fp->mt->command_c);
2125
0
        hts_tpool_wake_dispatch(fp->mt->out_queue);
2126
0
        do {
2127
0
            if (fp->mt->command == CLOSE) {
2128
                // possible error in bgzf_mt_reader
2129
0
                pthread_mutex_unlock(&fp->mt->command_m);
2130
0
                return 0;
2131
0
            }
2132
0
            pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
2133
0
            switch (fp->mt->command) {
2134
0
            case HAS_EOF_DONE: break;
2135
0
            case HAS_EOF:
2136
                // Resend signal intended for bgzf_mt_reader()
2137
0
                pthread_cond_signal(&fp->mt->command_c);
2138
0
                break;
2139
0
            case CLOSE:
2140
0
                continue;
2141
0
            default:
2142
0
                abort();  // Should not get to any other state
2143
0
            }
2144
0
        } while (fp->mt->command != HAS_EOF_DONE);
2145
0
        fp->mt->command = NONE;
2146
0
        has_eof = fp->mt->eof;
2147
0
        pthread_mutex_unlock(&fp->mt->command_m);
2148
139
    } else {
2149
139
        has_eof = bgzf_check_EOF_common(fp);
2150
139
    }
2151
2152
139
    fp->no_eof_block = (has_eof == 0);
2153
2154
139
    return has_eof;
2155
139
}
2156
2157
static inline int64_t bgzf_seek_common(BGZF* fp,
2158
                                       int64_t block_address, int block_offset)
2159
0
{
2160
0
    if (fp->mt) {
2161
        // The reader runs asynchronous and does loops of:
2162
        //    Read block
2163
        //    Check & process command
2164
        //    Dispatch decode job
2165
        //
2166
        // Once at EOF it then switches to loops of
2167
        //    Wait for command
2168
        //    Process command (possibly switching back to above loop).
2169
        //
2170
        // To seek we therefore send the reader thread a SEEK command,
2171
        // waking it up if blocked in dispatch and signalling if
2172
        // waiting for a command.  We then wait for the response so we
2173
        // know the seek succeeded.
2174
0
        pthread_mutex_lock(&fp->mt->command_m);
2175
0
        fp->mt->hit_eof = 0;
2176
        // fp->mt->command state transitions should be:
2177
        // NONE -> SEEK -> SEEK_DONE -> NONE
2178
        // (SEEK -> SEEK_DONE happens in bgzf_mt_reader thread)
2179
0
        fp->mt->command = SEEK;
2180
0
        fp->mt->block_address = block_address;
2181
0
        pthread_cond_signal(&fp->mt->command_c);
2182
0
        hts_tpool_wake_dispatch(fp->mt->out_queue);
2183
0
        do {
2184
0
            pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
2185
0
            switch (fp->mt->command) {
2186
0
            case SEEK_DONE: break;
2187
0
            case SEEK:
2188
                // Resend signal intended for bgzf_mt_reader()
2189
0
                pthread_cond_signal(&fp->mt->command_c);
2190
0
                break;
2191
0
            default:
2192
0
                abort();  // Should not get to any other state
2193
0
            }
2194
0
        } while (fp->mt->command != SEEK_DONE);
2195
0
        fp->mt->command = NONE;
2196
2197
0
        fp->block_length = 0;  // indicates current block has not been loaded
2198
0
        fp->block_address = block_address;
2199
0
        fp->block_offset = block_offset;
2200
2201
0
        pthread_mutex_unlock(&fp->mt->command_m);
2202
0
    } else {
2203
0
        if (hseek(fp->fp, block_address, SEEK_SET) < 0) {
2204
0
            fp->errcode |= BGZF_ERR_IO;
2205
0
            return -1;
2206
0
        }
2207
0
        fp->block_length = 0;  // indicates current block has not been loaded
2208
0
        fp->block_address = block_address;
2209
0
        fp->block_offset = block_offset;
2210
0
    }
2211
2212
0
    return 0;
2213
0
}
2214
2215
int64_t bgzf_seek(BGZF* fp, int64_t pos, int where)
2216
0
{
2217
0
    if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
2218
0
        fp->errcode |= BGZF_ERR_MISUSE;
2219
0
        return -1;
2220
0
    }
2221
2222
    // This is a flag to indicate we've jumped elsewhere in the stream, to act
2223
    // as a hint to any other code which is wrapping up bgzf for its own
2224
    // purposes.  We may not be able to tell when seek happens as it can be
2225
    // done on our behalf, eg by the iterator.
2226
    //
2227
    // This is never cleared here.  Any tool that needs to handle it is also
2228
    // responsible for clearing it.
2229
0
    fp->seeked = pos;
2230
2231
0
    return bgzf_seek_common(fp, pos >> 16, pos & 0xFFFF);
2232
0
}
2233
2234
int bgzf_is_bgzf(const char *fn)
2235
0
{
2236
0
    uint8_t buf[16];
2237
0
    int n;
2238
0
    hFILE *fp;
2239
0
    if ((fp = hopen(fn, "r")) == 0) return 0;
2240
0
    n = hread(fp, buf, 16);
2241
0
    if (hclose(fp) < 0) return 0;
2242
0
    if (n != 16) return 0;
2243
0
    return check_header(buf) == 0? 1 : 0;
2244
0
}
2245
2246
int bgzf_compression(BGZF *fp)
2247
0
{
2248
0
    return (!fp->is_compressed)? no_compression : (fp->is_gzip)? gzip : bgzf;
2249
0
}
2250
2251
int bgzf_getc(BGZF *fp)
2252
0
{
2253
0
    if (fp->block_offset+1 < fp->block_length) {
2254
0
        fp->uncompressed_address++;
2255
0
        return ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
2256
0
    }
2257
2258
0
    int c;
2259
0
    if (fp->block_offset >= fp->block_length) {
2260
0
        if (bgzf_read_block(fp) != 0) return -2; /* error */
2261
0
        if (fp->block_length == 0) return -1; /* end-of-file */
2262
0
    }
2263
0
    c = ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
2264
0
    if (fp->block_offset == fp->block_length) {
2265
0
        fp->block_address = bgzf_htell(fp);
2266
0
        fp->block_offset = 0;
2267
0
        fp->block_length = 0;
2268
0
    }
2269
0
    fp->uncompressed_address++;
2270
0
    return c;
2271
0
}
2272
2273
int bgzf_getline(BGZF *fp, int delim, kstring_t *str)
2274
9.19M
{
2275
9.19M
    int l, state = 0;
2276
9.19M
    str->l = 0;
2277
9.30M
    do {
2278
9.30M
        if (fp->block_offset >= fp->block_length) {
2279
110k
            if (bgzf_read_block(fp) != 0) { state = -2; break; }
2280
109k
            if (fp->block_length == 0) { state = -1; break; }
2281
109k
        }
2282
9.30M
        unsigned char *buf = fp->uncompressed_block;
2283
7.15G
        for (l = fp->block_offset; l < fp->block_length && buf[l] != delim; ++l);
2284
9.30M
        if (l < fp->block_length) state = 1;
2285
9.30M
        l -= fp->block_offset;
2286
9.30M
        if (ks_expand(str, l + 2) < 0) { state = -3; break; }
2287
9.30M
        memcpy(str->s + str->l, buf + fp->block_offset, l);
2288
9.30M
        str->l += l;
2289
9.30M
        fp->block_offset += l + 1;
2290
9.30M
        if (fp->block_offset >= fp->block_length) {
2291
109k
            fp->block_address = bgzf_htell(fp);
2292
109k
            fp->block_offset = 0;
2293
109k
            fp->block_length = 0;
2294
109k
        }
2295
9.30M
    } while (state == 0);
2296
9.19M
    if (state < -1) return state;
2297
9.19M
    if (str->l == 0 && state < 0) return state;
2298
9.19M
    fp->uncompressed_address += str->l + 1;
2299
9.19M
    if ( delim=='\n' && str->l>0 && str->s[str->l-1]=='\r' ) str->l--;
2300
9.19M
    str->s[str->l] = 0;
2301
9.19M
    return str->l <= INT_MAX ? (int) str->l : INT_MAX;
2302
9.19M
}
2303
2304
void bgzf_index_destroy(BGZF *fp)
2305
1.18k
{
2306
1.18k
    if ( !fp->idx ) return;
2307
0
    free(fp->idx->offs);
2308
0
    free(fp->idx);
2309
0
    fp->idx = NULL;
2310
0
    fp->idx_build_otf = 0;
2311
0
}
2312
2313
int bgzf_index_build_init(BGZF *fp)
2314
0
{
2315
0
    bgzf_index_destroy(fp);
2316
0
    fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
2317
0
    if ( !fp->idx ) return -1;
2318
0
    fp->idx_build_otf = 1;  // build index on the fly
2319
0
    return 0;
2320
0
}
2321
2322
int bgzf_index_add_block(BGZF *fp)
2323
0
{
2324
0
    fp->idx->noffs++;
2325
0
    if ( fp->idx->noffs > fp->idx->moffs )
2326
0
    {
2327
0
        fp->idx->moffs = fp->idx->noffs;
2328
0
        kroundup32(fp->idx->moffs);
2329
0
        fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
2330
0
        if ( !fp->idx->offs ) return -1;
2331
0
    }
2332
0
    fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->ublock_addr;
2333
0
    fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->block_address;
2334
0
    return 0;
2335
0
}
2336
2337
static inline int hwrite_uint64(uint64_t x, hFILE *f)
2338
0
{
2339
0
    if (ed_is_big()) x = ed_swap_8(x);
2340
0
    if (hwrite(f, &x, sizeof(x)) != sizeof(x)) return -1;
2341
0
    return 0;
2342
0
}
2343
2344
static char * get_name_suffix(const char *bname, const char *suffix)
2345
0
{
2346
0
    size_t len = strlen(bname) + strlen(suffix) + 1;
2347
0
    char *buff = malloc(len);
2348
0
    if (!buff) return NULL;
2349
0
    snprintf(buff, len, "%s%s", bname, suffix);
2350
0
    return buff;
2351
0
}
2352
2353
int bgzf_index_dump_hfile(BGZF *fp, struct hFILE *idx, const char *name)
2354
0
{
2355
    // Note that the index contains one extra record when indexing files opened
2356
    // for reading. The terminating record is not present when opened for writing.
2357
    // This is not a bug.
2358
2359
0
    int i;
2360
2361
0
    if (!fp->idx) {
2362
0
        hts_log_error("Called for BGZF handle with no index");
2363
0
        errno = EINVAL;
2364
0
        return -1;
2365
0
    }
2366
2367
0
    if (bgzf_flush(fp) != 0) return -1;
2368
2369
    // discard the entry marking the end of the file
2370
0
    if (fp->mt && fp->idx)
2371
0
        fp->idx->noffs--;
2372
2373
0
    if (hwrite_uint64(fp->idx->noffs - 1, idx) < 0) goto fail;
2374
0
    for (i=1; i<fp->idx->noffs; i++)
2375
0
    {
2376
0
        if (hwrite_uint64(fp->idx->offs[i].caddr, idx) < 0) goto fail;
2377
0
        if (hwrite_uint64(fp->idx->offs[i].uaddr, idx) < 0) goto fail;
2378
0
    }
2379
0
    return 0;
2380
2381
0
 fail:
2382
0
    hts_log_error("Error writing to %s : %s", name ? name : "index", strerror(errno));
2383
0
    return -1;
2384
0
}
2385
2386
int bgzf_index_dump(BGZF *fp, const char *bname, const char *suffix)
2387
0
{
2388
0
    const char *name = bname, *msg = NULL;
2389
0
    char *tmp = NULL;
2390
0
    hFILE *idx = NULL;
2391
2392
0
    if (!fp->idx) {
2393
0
        hts_log_error("Called for BGZF handle with no index");
2394
0
        errno = EINVAL;
2395
0
        return -1;
2396
0
    }
2397
2398
0
    if ( suffix )
2399
0
    {
2400
0
        tmp = get_name_suffix(bname, suffix);
2401
0
        if ( !tmp ) return -1;
2402
0
        name = tmp;
2403
0
    }
2404
2405
0
    idx = hopen(name, "wb");
2406
0
    if ( !idx ) {
2407
0
        msg = "Error opening";
2408
0
        goto fail;
2409
0
    }
2410
2411
0
    if (bgzf_index_dump_hfile(fp, idx, name) != 0) goto fail;
2412
2413
0
    if (hclose(idx) < 0)
2414
0
    {
2415
0
        idx = NULL;
2416
0
        msg = "Error on closing";
2417
0
        goto fail;
2418
0
    }
2419
2420
0
    free(tmp);
2421
0
    return 0;
2422
2423
0
 fail:
2424
0
    if (msg != NULL) {
2425
0
        hts_log_error("%s %s : %s", msg, name, strerror(errno));
2426
0
    }
2427
0
    if (idx) hclose_abruptly(idx);
2428
0
    free(tmp);
2429
0
    return -1;
2430
0
}
2431
2432
static inline int hread_uint64(uint64_t *xptr, hFILE *f)
2433
0
{
2434
0
    if (hread(f, xptr, sizeof(*xptr)) != sizeof(*xptr)) return -1;
2435
0
    if (ed_is_big()) ed_swap_8p(xptr);
2436
0
    return 0;
2437
0
}
2438
2439
int bgzf_index_load_hfile(BGZF *fp, struct hFILE *idx, const char *name)
2440
0
{
2441
0
    fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
2442
0
    if (fp->idx == NULL) goto fail;
2443
0
    uint64_t x;
2444
0
    if (hread_uint64(&x, idx) < 0) goto fail;
2445
2446
0
    fp->idx->noffs = fp->idx->moffs = x + 1;
2447
0
    fp->idx->offs  = (bgzidx1_t*) malloc(fp->idx->moffs*sizeof(bgzidx1_t));
2448
0
    if (fp->idx->offs == NULL) goto fail;
2449
0
    fp->idx->offs[0].caddr = fp->idx->offs[0].uaddr = 0;
2450
2451
0
    int i;
2452
0
    for (i=1; i<fp->idx->noffs; i++)
2453
0
    {
2454
0
        if (hread_uint64(&fp->idx->offs[i].caddr, idx) < 0) goto fail;
2455
0
        if (hread_uint64(&fp->idx->offs[i].uaddr, idx) < 0) goto fail;
2456
0
    }
2457
2458
0
    return 0;
2459
2460
0
 fail:
2461
0
    hts_log_error("Error reading %s : %s", name ? name : "index", strerror(errno));
2462
0
    if (fp->idx) {
2463
0
        free(fp->idx->offs);
2464
0
        free(fp->idx);
2465
0
        fp->idx = NULL;
2466
0
    }
2467
0
    return -1;
2468
0
}
2469
2470
int bgzf_index_load(BGZF *fp, const char *bname, const char *suffix)
2471
0
{
2472
0
    const char *name = bname, *msg = NULL;
2473
0
    char *tmp = NULL;
2474
0
    hFILE *idx = NULL;
2475
0
    if ( suffix )
2476
0
    {
2477
0
        tmp = get_name_suffix(bname, suffix);
2478
0
        if ( !tmp ) return -1;
2479
0
        name = tmp;
2480
0
    }
2481
2482
0
    idx = hopen(name, "rb");
2483
0
    if ( !idx ) {
2484
0
        msg = "Error opening";
2485
0
        goto fail;
2486
0
    }
2487
2488
0
    if (bgzf_index_load_hfile(fp, idx, name) != 0) goto fail;
2489
2490
0
    if (hclose(idx) != 0) {
2491
0
        idx = NULL;
2492
0
        msg = "Error closing";
2493
0
        goto fail;
2494
0
    }
2495
2496
0
    free(tmp);
2497
0
    return 0;
2498
2499
0
 fail:
2500
0
    if (msg != NULL) {
2501
0
        hts_log_error("%s %s : %s", msg, name, strerror(errno));
2502
0
    }
2503
0
    if (idx) hclose_abruptly(idx);
2504
0
    free(tmp);
2505
0
    return -1;
2506
0
}
2507
2508
int bgzf_useek(BGZF *fp, off_t uoffset, int where)
2509
0
{
2510
0
    if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
2511
0
        fp->errcode |= BGZF_ERR_MISUSE;
2512
0
        return -1;
2513
0
    }
2514
0
    if (uoffset >= fp->uncompressed_address - fp->block_offset &&
2515
0
        uoffset < fp->uncompressed_address + fp->block_length - fp->block_offset) {
2516
        // Can seek into existing data
2517
0
        fp->block_offset += uoffset - fp->uncompressed_address;
2518
0
        fp->uncompressed_address = uoffset;
2519
0
        return 0;
2520
0
    }
2521
0
    if ( !fp->is_compressed )
2522
0
    {
2523
0
        if (hseek(fp->fp, uoffset, SEEK_SET) < 0)
2524
0
        {
2525
0
            fp->errcode |= BGZF_ERR_IO;
2526
0
            return -1;
2527
0
        }
2528
0
        fp->block_length = 0;  // indicates current block has not been loaded
2529
0
        fp->block_address = uoffset;
2530
0
        fp->block_offset = 0;
2531
0
        if (bgzf_read_block(fp) < 0) {
2532
0
            fp->errcode |= BGZF_ERR_IO;
2533
0
            return -1;
2534
0
        }
2535
0
        fp->uncompressed_address = uoffset;
2536
0
        return 0;
2537
0
    }
2538
2539
0
    if ( !fp->idx )
2540
0
    {
2541
0
        fp->errcode |= BGZF_ERR_IO;
2542
0
        return -1;
2543
0
    }
2544
2545
    // binary search
2546
0
    int ilo = 0, ihi = fp->idx->noffs - 1;
2547
0
    while ( ilo<=ihi )
2548
0
    {
2549
0
        int i = (ilo+ihi)*0.5;
2550
0
        if ( uoffset < fp->idx->offs[i].uaddr ) ihi = i - 1;
2551
0
        else if ( uoffset >= fp->idx->offs[i].uaddr ) ilo = i + 1;
2552
0
        else break;
2553
0
    }
2554
0
    int i = ilo-1;
2555
0
    if (bgzf_seek_common(fp, fp->idx->offs[i].caddr, 0) < 0)
2556
0
        return -1;
2557
2558
0
    if ( bgzf_read_block(fp) < 0 ) {
2559
0
        fp->errcode |= BGZF_ERR_IO;
2560
0
        return -1;
2561
0
    }
2562
0
    if ( uoffset - fp->idx->offs[i].uaddr > 0 )
2563
0
    {
2564
0
        fp->block_offset = uoffset - fp->idx->offs[i].uaddr;
2565
0
        assert( fp->block_offset <= fp->block_length );     // todo: skipped, unindexed, blocks
2566
0
    }
2567
0
    fp->uncompressed_address = uoffset;
2568
0
    return 0;
2569
0
}
2570
2571
off_t bgzf_utell(BGZF *fp)
2572
0
{
2573
0
    return fp->uncompressed_address;    // currently maintained only when reading
2574
0
}
2575
2576
/* prototype is in hfile_internal.h */
2577
0
struct hFILE *bgzf_hfile(struct BGZF *fp) {
2578
0
    return fp->fp;
2579
0
}