NVBIO
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
bgzf.c
Go to the documentation of this file.
1 /* The MIT License
2 
3  Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4  2011 Attractive Chaos <attractor@live.co.uk>
5 
6  Permission is hereby granted, free of charge, to any person obtaining a copy
7  of this software and associated documentation files (the "Software"), to deal
8  in the Software without restriction, including without limitation the rights
9  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  copies of the Software, and to permit persons to whom the Software is
11  furnished to do so, subject to the following conditions:
12 
13  The above copyright notice and this permission notice shall be included in
14  all copies or substantial portions of the Software.
15 
16  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  THE SOFTWARE.
23 */
24 
25 #include "config.h"
26 
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <unistd.h>
32 #include <assert.h>
33 #include <pthread.h>
34 #include <sys/types.h>
35 #include <inttypes.h>
36 
37 #include "htslib/hts.h"
38 #include "htslib/bgzf.h"
39 #include "htslib/hfile.h"
40 
41 #define BLOCK_HEADER_LENGTH 18
42 #define BLOCK_FOOTER_LENGTH 8
43 
44 
45 /* BGZF/GZIP header (speciallized from RFC 1952; little endian):
46  +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
47  | 31|139| 8| 4| 0| 0|255| 6| 66| 67| 2|BLK_LEN|
48  +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
49  BGZF extension:
50  ^ ^ ^ ^
51  | | | |
52  FLG.EXTRA XLEN B C
53 
54  BGZF format is compatible with GZIP. It limits the size of each compressed
55  block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
56  records the size.
57 
58 */
59 static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
60 
61 #ifdef BGZF_CACHE
62 typedef struct {
63  int size;
66 } cache_t;
67 #include "htslib/khash.h"
69 #endif
70 
71 typedef struct
72 {
73  uint64_t uaddr; // offset w.r.t. uncompressed data
74  uint64_t caddr; // offset w.r.t. compressed data
75 }
76 bgzidx1_t;
77 
78 struct __bgzidx_t
79 {
80  int noffs, moffs; // the size of the index, n:used, m:allocated
81  bgzidx1_t *offs; // offsets
82  uint64_t ublock_addr; // offset of the current block (uncompressed data)
83 };
84 
85 void bgzf_index_destroy(BGZF *fp);
86 int bgzf_index_add_block(BGZF *fp);
87 
88 static inline void packInt16(uint8_t *buffer, uint16_t value)
89 {
90  buffer[0] = value;
91  buffer[1] = value >> 8;
92 }
93 
94 static inline int unpackInt16(const uint8_t *buffer)
95 {
96  return buffer[0] | buffer[1] << 8;
97 }
98 
99 static inline void packInt32(uint8_t *buffer, uint32_t value)
100 {
101  buffer[0] = value;
102  buffer[1] = value >> 8;
103  buffer[2] = value >> 16;
104  buffer[3] = value >> 24;
105 }
106 
107 static BGZF *bgzf_read_init(hFILE *hfpr)
108 {
109  BGZF *fp;
110  uint8_t magic[2];
111  ssize_t n = hpeek(hfpr, magic, 2);
112  if (n < 0) return NULL;
113 
114  fp = (BGZF*)calloc(1, sizeof(BGZF));
115  if (fp == NULL) return NULL;
116 
117  fp->is_write = 0;
118  fp->is_compressed = (n==2 && magic[0]==0x1f && magic[1]==0x8b);
121 #ifdef BGZF_CACHE
122  fp->cache = kh_init(cache);
123 #endif
124  return fp;
125 }
126 
127 static BGZF *bgzf_write_init(int compress_level) // compress_level==-1 for the default level, -2 plain uncompressed
128 {
129  BGZF *fp;
130  fp = (BGZF*)calloc(1, sizeof(BGZF));
131  fp->is_write = 1;
132  if ( compress_level==-2 )
133  {
134  fp->is_compressed = 0;
135  return fp;
136  }
137  fp->is_compressed = 1;
140  fp->compress_level = compress_level < 0? Z_DEFAULT_COMPRESSION : compress_level; // Z_DEFAULT_COMPRESSION==-1
142  return fp;
143 }
144 // get the compress level from the mode string
145 static int mode2level(const char *__restrict mode)
146 {
147  int i, compress_level = -1;
148  for (i = 0; mode[i]; ++i)
149  if (mode[i] >= '0' && mode[i] <= '9') break;
150  if (mode[i]) compress_level = (int)mode[i] - '0';
151  if (strchr(mode, 'u')) compress_level = -2;
152  return compress_level;
153 }
154 
155 BGZF *bgzf_open(const char *path, const char *mode)
156 {
157  BGZF *fp = 0;
159  if (strchr(mode, 'r')) {
160  hFILE *fpr;
161  if ((fpr = hopen(path, mode)) == 0) return 0;
162  fp = bgzf_read_init(fpr);
163  if (fp == 0) { hclose_abruptly(fpr); return NULL; }
164  fp->fp = fpr;
165  } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
166  hFILE *fpw;
167  if ((fpw = hopen(path, mode)) == 0) return 0;
168  fp = bgzf_write_init(mode2level(mode));
169  fp->fp = fpw;
170  }
171  else { errno = EINVAL; return 0; }
172 
173  fp->is_be = ed_is_big();
174  return fp;
175 }
176 
177 BGZF *bgzf_dopen(int fd, const char *mode)
178 {
179  BGZF *fp = 0;
181  if (strchr(mode, 'r')) {
182  hFILE *fpr;
183  if ((fpr = hdopen(fd, mode)) == 0) return 0;
184  fp = bgzf_read_init(fpr);
185  if (fp == 0) { hclose_abruptly(fpr); return NULL; } // FIXME this closes fd
186  fp->fp = fpr;
187  } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
188  hFILE *fpw;
189  if ((fpw = hdopen(fd, mode)) == 0) return 0;
190  fp = bgzf_write_init(mode2level(mode));
191  fp->fp = fpw;
192  }
193  else { errno = EINVAL; return 0; }
194 
195  fp->is_be = ed_is_big();
196  return fp;
197 }
198 
199 BGZF *bgzf_hopen(hFILE *hfp, const char *mode)
200 {
201  BGZF *fp = NULL;
203  if (strchr(mode, 'r')) {
204  fp = bgzf_read_init(hfp);
205  if (fp == NULL) return NULL;
206  } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
207  fp = bgzf_write_init(mode2level(mode));
208  }
209  else { errno = EINVAL; return 0; }
210 
211  fp->fp = hfp;
212  fp->is_be = ed_is_big();
213  return fp;
214 }
215 
216 static int bgzf_compress(void *_dst, int *dlen, void *src, int slen, int level)
217 {
218  uint32_t crc;
219  z_stream zs;
220  uint8_t *dst = (uint8_t*)_dst;
221 
222  // compress the body
223  zs.zalloc = NULL; zs.zfree = NULL;
224  zs.next_in = (Bytef*)src;
225  zs.avail_in = slen;
226  zs.next_out = dst + BLOCK_HEADER_LENGTH;
227  zs.avail_out = *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
228  if (deflateInit2(&zs, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK) return -1; // -15 to disable zlib header/footer
229  if (deflate(&zs, Z_FINISH) != Z_STREAM_END) return -1;
230  if (deflateEnd(&zs) != Z_OK) return -1;
231  *dlen = zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
232  // write the header
233  memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
234  packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
235  // write the footer
236  crc = crc32(crc32(0L, NULL, 0L), (Bytef*)src, slen);
237  packInt32((uint8_t*)&dst[*dlen - 8], crc);
238  packInt32((uint8_t*)&dst[*dlen - 4], slen);
239  return 0;
240 }
241 
242 // Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
243 static int deflate_block(BGZF *fp, int block_length)
244 {
245  int comp_size = BGZF_MAX_BLOCK_SIZE;
246  if (bgzf_compress(fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level) != 0) {
247  fp->errcode |= BGZF_ERR_ZLIB;
248  return -1;
249  }
250  fp->block_offset = 0;
251  return comp_size;
252 }
253 
254 // Inflate the block in fp->compressed_block into fp->uncompressed_block
255 static int inflate_block(BGZF* fp, int block_length)
256 {
257  z_stream zs;
258  zs.zalloc = NULL;
259  zs.zfree = NULL;
260  zs.next_in = (Bytef*)fp->compressed_block + 18;
261  zs.avail_in = block_length - 16;
262  zs.next_out = (Bytef*)fp->uncompressed_block;
264 
265  if (inflateInit2(&zs, -15) != Z_OK) {
266  fp->errcode |= BGZF_ERR_ZLIB;
267  return -1;
268  }
269  if (inflate(&zs, Z_FINISH) != Z_STREAM_END) {
270  inflateEnd(&zs);
271  fp->errcode |= BGZF_ERR_ZLIB;
272  return -1;
273  }
274  if (inflateEnd(&zs) != Z_OK) {
275  fp->errcode |= BGZF_ERR_ZLIB;
276  return -1;
277  }
278  return zs.total_out;
279 }
280 
281 static int inflate_gzip_block(BGZF *fp, int cached)
282 {
283  int ret = Z_OK;
284  do
285  {
286  if ( !cached && fp->gz_stream->avail_out!=0 )
287  {
289  if ( fp->gz_stream->avail_in<=0 ) return fp->gz_stream->avail_in;
290  if ( fp->gz_stream->avail_in==0 ) break;
292  }
293  else cached = 0;
294  do
295  {
298  ret = inflate(fp->gz_stream, Z_NO_FLUSH);
299  if ( ret==Z_BUF_ERROR ) continue; // non-critical error
300  if ( ret<0 ) return -1;
301  unsigned int have = BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
302  if ( have ) return have;
303  }
304  while ( fp->gz_stream->avail_out == 0 );
305  }
306  while (ret != Z_STREAM_END);
308 }
309 
310 // Returns: 0 on success (BGZF header); -1 on non-BGZF GZIP header; -2 on error
311 static int check_header(const uint8_t *header)
312 {
313  if ( header[0] != 31 || header[1] != 139 || header[2] != 8 ) return -2;
314  return ((header[3] & 4) != 0
315  && unpackInt16((uint8_t*)&header[10]) == 6
316  && header[12] == 'B' && header[13] == 'C'
317  && unpackInt16((uint8_t*)&header[14]) == 2) ? 0 : -1;
318 }
319 
320 #ifdef BGZF_CACHE
321 static void free_cache(BGZF *fp)
322 {
323  khint_t k;
324  khash_t(cache) *h = (khash_t(cache)*)fp->cache;
325  if (fp->is_write) return;
326  for (k = kh_begin(h); k < kh_end(h); ++k)
327  if (kh_exist(h, k)) free(kh_val(h, k).block);
328  kh_destroy(cache, h);
329 }
330 
331 static int load_block_from_cache(BGZF *fp, int64_t block_address)
332 {
333  khint_t k;
334  cache_t *p;
335  khash_t(cache) *h = (khash_t(cache)*)fp->cache;
336  k = kh_get(cache, h, block_address);
337  if (k == kh_end(h)) return 0;
338  p = &kh_val(h, k);
339  if (fp->block_length != 0) fp->block_offset = 0;
340  fp->block_address = block_address;
341  fp->block_length = p->size;
343  if ( hseek(fp->fp, p->end_offset, SEEK_SET) < 0 )
344  {
345  // todo: move the error up
346  fprintf(stderr,"Could not hseek to %"PRId64"\n", p->end_offset);
347  exit(1);
348  }
349  return p->size;
350 }
351 
352 static void cache_block(BGZF *fp, int size)
353 {
354  int ret;
355  khint_t k;
356  cache_t *p;
357  khash_t(cache) *h = (khash_t(cache)*)fp->cache;
358  if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return;
359  if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > (uint32_t)fp->cache_size) {
360  /* A better way would be to remove the oldest block in the
361  * cache, but here we remove a random one for simplicity. This
362  * should not have a big impact on performance. */
363  for (k = kh_begin(h); k < kh_end(h); ++k)
364  if (kh_exist(h, k)) break;
365  if (k < kh_end(h)) {
366  free(kh_val(h, k).block);
367  kh_del(cache, h, k);
368  }
369  }
370  k = kh_put(cache, h, fp->block_address, &ret);
371  if (ret == 0) return; // if this happens, a bug!
372  p = &kh_val(h, k);
373  p->size = fp->block_length;
374  p->end_offset = fp->block_address + size;
375  p->block = (uint8_t*)malloc(BGZF_MAX_BLOCK_SIZE);
376  memcpy(kh_val(h, k).block, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
377 }
378 #else
379 static void free_cache(BGZF *fp) {}
380 static int load_block_from_cache(BGZF *fp, int64_t block_address) {return 0;}
381 static void cache_block(BGZF *fp, int size) {}
382 #endif
383 
385 {
386  uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
387  int count, size = 0, block_length, remaining;
388 
389  // Reading an uncompressed file
390  if ( !fp->is_compressed )
391  {
392  count = hread(fp->fp, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
393  if ( count==0 )
394  {
395  fp->block_length = 0;
396  return 0;
397  }
398  if (fp->block_length != 0) fp->block_offset = 0;
399  fp->block_address += count;
400  fp->block_length = count;
401  return 0;
402  }
403 
404  // Reading compressed file
405  int64_t block_address;
406  block_address = htell(fp->fp);
407  if ( fp->is_gzip )
408  {
409  count = inflate_gzip_block(fp, 0);
410  if ( count<0 )
411  {
412  fp->errcode |= BGZF_ERR_ZLIB;
413  return -1;
414  }
415  fp->block_length = count;
416  fp->block_address = block_address;
417  return 0;
418  }
419  if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
420  count = hread(fp->fp, header, sizeof(header));
421  if (count == 0) { // no data read
422  fp->block_length = 0;
423  return 0;
424  }
425  int ret;
426  if ( count != sizeof(header) || (ret=check_header(header))==-2 )
427  {
428  fp->errcode |= BGZF_ERR_HEADER;
429  return -1;
430  }
431  if ( ret==-1 )
432  {
433  // GZIP, not BGZF
434  uint8_t *cblock = (uint8_t*)fp->compressed_block;
435  memcpy(cblock, header, sizeof(header));
436  count = hread(fp->fp, cblock+sizeof(header), BGZF_BLOCK_SIZE - sizeof(header)) + sizeof(header);
437  int nskip = 10;
438 
439  // Check optional fields to skip: FLG.FNAME,FLG.FCOMMENT,FLG.FHCRC,FLG.FEXTRA
440  // Note: Some of these fields are untested, I did not have appropriate data available
441  if ( header[3] & 0x4 ) // FLG.FEXTRA
442  {
443  nskip += unpackInt16(&cblock[nskip]) + 2;
444  }
445  if ( header[3] & 0x8 ) // FLG.FNAME
446  {
447  while ( nskip<BGZF_BLOCK_SIZE && cblock[nskip] ) nskip++;
448  if ( nskip==BGZF_BLOCK_SIZE )
449  {
450  fp->errcode |= BGZF_ERR_HEADER;
451  return -1;
452  }
453  nskip++;
454  }
455  if ( header[3] & 0x10 ) // FLG.FCOMMENT
456  {
457  while ( nskip<BGZF_BLOCK_SIZE && cblock[nskip] ) nskip++;
458  if ( nskip==BGZF_BLOCK_SIZE )
459  {
460  fp->errcode |= BGZF_ERR_HEADER;
461  return -1;
462  }
463  nskip++;
464  }
465  if ( header[3] & 0x2 ) nskip += 2; // FLG.FHCRC
466 
467  fp->is_gzip = 1;
468  fp->gz_stream = (z_stream*) calloc(1,sizeof(z_stream));
469  int ret = inflateInit2(fp->gz_stream, -15);
470  if (ret != Z_OK)
471  {
472  fp->errcode |= BGZF_ERR_ZLIB;
473  return -1;
474  }
475  fp->gz_stream->avail_in = count - nskip;
476  fp->gz_stream->next_in = cblock + nskip;
477  count = inflate_gzip_block(fp, 1);
478  if ( count<0 )
479  {
480  fp->errcode |= BGZF_ERR_ZLIB;
481  return -1;
482  }
483  fp->block_length = count;
484  fp->block_address = block_address;
485  return 0;
486  }
487  size = count;
488  block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
489  compressed_block = (uint8_t*)fp->compressed_block;
490  memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
491  remaining = block_length - BLOCK_HEADER_LENGTH;
492  count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
493  if (count != remaining) {
494  fp->errcode |= BGZF_ERR_IO;
495  return -1;
496  }
497  size += count;
498  if ((count = inflate_block(fp, block_length)) < 0) return -1;
499  if (fp->block_length != 0) fp->block_offset = 0; // Do not reset offset if this read follows a seek.
500  fp->block_address = block_address;
501  fp->block_length = count;
502  if ( fp->idx_build_otf )
503  {
505  fp->idx->ublock_addr += count;
506  }
507  cache_block(fp, size);
508  return 0;
509 }
510 
511 ssize_t bgzf_read(BGZF *fp, void *data, size_t length)
512 {
513  ssize_t bytes_read = 0;
514  uint8_t *output = (uint8_t*)data;
515  if (length <= 0) return 0;
516  assert(fp->is_write == 0);
517  while (bytes_read < length) {
518  int copy_length, available = fp->block_length - fp->block_offset;
519  uint8_t *buffer;
520  if (available <= 0) {
521  if (bgzf_read_block(fp) != 0) return -1;
522  available = fp->block_length - fp->block_offset;
523  if (available <= 0) break;
524  }
525  copy_length = length - bytes_read < available? length - bytes_read : available;
526  buffer = (uint8_t*)fp->uncompressed_block;
527  memcpy(output, buffer + fp->block_offset, copy_length);
528  fp->block_offset += copy_length;
529  output += copy_length;
530  bytes_read += copy_length;
531  }
532  if (fp->block_offset == fp->block_length) {
533  fp->block_address = htell(fp->fp);
534  fp->block_offset = fp->block_length = 0;
535  }
536  fp->uncompressed_address += bytes_read;
537  return bytes_read;
538 }
539 
540 ssize_t bgzf_raw_read(BGZF *fp, void *data, size_t length)
541 {
542  return hread(fp->fp, data, length);
543 }
544 
545 #ifdef BGZF_MT
546 
547 typedef struct {
548  struct bgzf_mtaux_t *mt;
549  void *buf;
550  int i, errcode, toproc, compress_level;
551 } worker_t;
552 
553 typedef struct bgzf_mtaux_t {
555  volatile int proc_cnt;
556  void **blk;
557  int *len;
559  pthread_t *tid;
560  pthread_mutex_t lock;
561  pthread_cond_t cv;
562 } mtaux_t;
563 
564 static int worker_aux(worker_t *w)
565 {
566  int i, stop = 0;
567  // wait for condition: to process or all done
568  pthread_mutex_lock(&w->mt->lock);
569  while (!w->toproc && !w->mt->done)
570  pthread_cond_wait(&w->mt->cv, &w->mt->lock);
571  if (w->mt->done) stop = 1;
572  w->toproc = 0;
573  pthread_mutex_unlock(&w->mt->lock);
574  if (stop) return 1; // to quit the thread
575  w->errcode = 0;
576  for (i = w->i; i < w->mt->curr; i += w->mt->n_threads) {
577  int clen = BGZF_MAX_BLOCK_SIZE;
578  if (bgzf_compress(w->buf, &clen, w->mt->blk[i], w->mt->len[i], w->compress_level) != 0)
579  w->errcode |= BGZF_ERR_ZLIB;
580  memcpy(w->mt->blk[i], w->buf, clen);
581  w->mt->len[i] = clen;
582  }
583  __sync_fetch_and_add(&w->mt->proc_cnt, 1);
584  return 0;
585 }
586 
587 static void *mt_worker(void *data)
588 {
589  while (worker_aux((worker_t*)data) == 0);
590  return 0;
591 }
592 
593 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
594 {
595  int i;
596  mtaux_t *mt;
597  pthread_attr_t attr;
598  if (!fp->is_write || fp->mt || n_threads <= 1) return -1;
599  mt = (mtaux_t*)calloc(1, sizeof(mtaux_t));
600  mt->n_threads = n_threads;
601  mt->n_blks = n_threads * n_sub_blks;
602  mt->len = (int*)calloc(mt->n_blks, sizeof(int));
603  mt->blk = (void**)calloc(mt->n_blks, sizeof(void*));
604  for (i = 0; i < mt->n_blks; ++i)
605  mt->blk[i] = malloc(BGZF_MAX_BLOCK_SIZE);
606  mt->tid = (pthread_t*)calloc(mt->n_threads, sizeof(pthread_t)); // tid[0] is not used, as the worker 0 is launched by the master
607  mt->w = (worker_t*)calloc(mt->n_threads, sizeof(worker_t));
608  for (i = 0; i < mt->n_threads; ++i) {
609  mt->w[i].i = i;
610  mt->w[i].mt = mt;
611  mt->w[i].compress_level = fp->compress_level;
612  mt->w[i].buf = malloc(BGZF_MAX_BLOCK_SIZE);
613  }
614  pthread_attr_init(&attr);
615  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
616  pthread_mutex_init(&mt->lock, 0);
617  pthread_cond_init(&mt->cv, 0);
618  for (i = 1; i < mt->n_threads; ++i) // worker 0 is effectively launched by the master thread
619  pthread_create(&mt->tid[i], &attr, mt_worker, &mt->w[i]);
620  fp->mt = mt;
621  return 0;
622 }
623 
624 static void mt_destroy(mtaux_t *mt)
625 {
626  int i;
627  // signal all workers to quit
628  pthread_mutex_lock(&mt->lock);
629  mt->done = 1; mt->proc_cnt = 0;
630  pthread_cond_broadcast(&mt->cv);
631  pthread_mutex_unlock(&mt->lock);
632  for (i = 1; i < mt->n_threads; ++i) pthread_join(mt->tid[i], 0); // worker 0 is effectively launched by the master thread
633  // free other data allocated on heap
634  for (i = 0; i < mt->n_blks; ++i) free(mt->blk[i]);
635  for (i = 0; i < mt->n_threads; ++i) free(mt->w[i].buf);
636  free(mt->blk); free(mt->len); free(mt->w); free(mt->tid);
637  pthread_cond_destroy(&mt->cv);
638  pthread_mutex_destroy(&mt->lock);
639  free(mt);
640 }
641 
642 static void mt_queue(BGZF *fp)
643 {
644  mtaux_t *mt = fp->mt;
645  assert(mt->curr < mt->n_blks); // guaranteed by the caller
646  memcpy(mt->blk[mt->curr], fp->uncompressed_block, fp->block_offset);
647  mt->len[mt->curr] = fp->block_offset;
648  fp->block_offset = 0;
649  ++mt->curr;
650 }
651 
652 static int mt_flush_queue(BGZF *fp)
653 {
654  int i;
655  mtaux_t *mt = fp->mt;
656  // signal all the workers to compress
657  pthread_mutex_lock(&mt->lock);
658  for (i = 0; i < mt->n_threads; ++i) mt->w[i].toproc = 1;
659  mt->proc_cnt = 0;
660  pthread_cond_broadcast(&mt->cv);
661  pthread_mutex_unlock(&mt->lock);
662  // worker 0 is doing things here
663  worker_aux(&mt->w[0]);
664  // wait for all the threads to complete
665  while (mt->proc_cnt < mt->n_threads);
666  // dump data to disk
667  for (i = 0; i < mt->n_threads; ++i) fp->errcode |= mt->w[i].errcode;
668  for (i = 0; i < mt->curr; ++i)
669  if (hwrite(fp->fp, mt->blk[i], mt->len[i]) != mt->len[i]) {
670  fp->errcode |= BGZF_ERR_IO;
671  break;
672  }
673  mt->curr = 0;
674  return (fp->errcode == 0)? 0 : -1;
675 }
676 
677 static int lazy_flush(BGZF *fp)
678 {
679  if (fp->mt) {
680  if (fp->block_offset) mt_queue(fp);
681  return (fp->mt->curr < fp->mt->n_blks)? 0 : mt_flush_queue(fp);
682  }
683  else return bgzf_flush(fp);
684 }
685 
686 #else // ~ #ifdef BGZF_MT
687 
688 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
689 {
690  return 0;
691 }
692 
693 static inline int lazy_flush(BGZF *fp)
694 {
695  return bgzf_flush(fp);
696 }
697 
698 #endif // ~ #ifdef BGZF_MT
699 
700 int bgzf_flush(BGZF *fp)
701 {
702  if (!fp->is_write) return 0;
703 #ifdef BGZF_MT
704  if (fp->mt) {
705  if (fp->block_offset) mt_queue(fp); // guaranteed that assertion does not fail
706  return mt_flush_queue(fp);
707  }
708 #endif
709  while (fp->block_offset > 0) {
710  if ( fp->idx_build_otf )
711  {
713  fp->idx->ublock_addr += fp->block_offset;
714  }
715  int block_length = deflate_block(fp, fp->block_offset);
716  if (block_length < 0) return -1;
717  if (hwrite(fp->fp, fp->compressed_block, block_length) != block_length) {
718  fp->errcode |= BGZF_ERR_IO; // possibly truncated file
719  return -1;
720  }
721  fp->block_address += block_length;
722  }
723  return 0;
724 }
725 
726 int bgzf_flush_try(BGZF *fp, ssize_t size)
727 {
728  if (fp->block_offset + size > BGZF_BLOCK_SIZE) return lazy_flush(fp);
729  return 0;
730 }
731 
732 ssize_t bgzf_write(BGZF *fp, const void *data, size_t length)
733 {
734  if ( !fp->is_compressed )
735  return hwrite(fp->fp, data, length);
736 
737  const uint8_t *input = (const uint8_t*)data;
738  ssize_t remaining = length;
739  assert(fp->is_write);
740  while (remaining > 0) {
741  uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
742  int copy_length = BGZF_BLOCK_SIZE - fp->block_offset;
743  if (copy_length > remaining) copy_length = remaining;
744  memcpy(buffer + fp->block_offset, input, copy_length);
745  fp->block_offset += copy_length;
746  input += copy_length;
747  remaining -= copy_length;
748  if (fp->block_offset == BGZF_BLOCK_SIZE) {
749  if (lazy_flush(fp) != 0) return -1;
750  }
751  }
752  return length - remaining;
753 }
754 
755 ssize_t bgzf_raw_write(BGZF *fp, const void *data, size_t length)
756 {
757  return hwrite(fp->fp, data, length);
758 }
759 
760 int bgzf_close(BGZF* fp)
761 {
762  int ret, block_length;
763  if (fp == 0) return -1;
764  if (fp->is_write && fp->is_compressed) {
765  if (bgzf_flush(fp) != 0) return -1;
766  fp->compress_level = -1;
767  block_length = deflate_block(fp, 0); // write an empty block
768  if (hwrite(fp->fp, fp->compressed_block, block_length) < 0
769  || hflush(fp->fp) != 0) {
770  fp->errcode |= BGZF_ERR_IO;
771  return -1;
772  }
773 #ifdef BGZF_MT
774  if (fp->mt) mt_destroy(fp->mt);
775 #endif
776  }
777  if ( fp->is_gzip )
778  {
779  (void)inflateEnd(fp->gz_stream);
780  free(fp->gz_stream);
781  }
782  ret = hclose(fp->fp);
783  if (ret != 0) return -1;
784  bgzf_index_destroy(fp);
785  free(fp->uncompressed_block);
786  free(fp->compressed_block);
787  free_cache(fp);
788  free(fp);
789  return 0;
790 }
791 
792 void bgzf_set_cache_size(BGZF *fp, int cache_size)
793 {
794  if (fp) fp->cache_size = cache_size;
795 }
796 
798 {
799  uint8_t buf[28];
800  off_t offset = htell(fp->fp);
801  if (hseek(fp->fp, -28, SEEK_END) < 0) {
802  if (errno == ESPIPE) { hclearerr(fp->fp); return 2; }
803  else return -1;
804  }
805  if ( hread(fp->fp, buf, 28) != 28 ) return -1;
806  if ( hseek(fp->fp, offset, SEEK_SET) < 0 ) return -1;
807  return (memcmp("\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", buf, 28) == 0)? 1 : 0;
808 }
809 
810 int64_t bgzf_seek(BGZF* fp, int64_t pos, int where)
811 {
812  int block_offset;
813  int64_t block_address;
814 
815  if (fp->is_write || where != SEEK_SET) {
816  fp->errcode |= BGZF_ERR_MISUSE;
817  return -1;
818  }
819  block_offset = pos & 0xFFFF;
820  block_address = pos >> 16;
821  if (hseek(fp->fp, block_address, SEEK_SET) < 0) {
822  fp->errcode |= BGZF_ERR_IO;
823  return -1;
824  }
825  fp->block_length = 0; // indicates current block has not been loaded
826  fp->block_address = block_address << 16;
827  fp->block_offset = block_offset;
828  return 0;
829 }
830 
831 int bgzf_is_bgzf(const char *fn)
832 {
833  uint8_t buf[16];
834  int n;
835  hFILE *fp;
836  if ((fp = hopen(fn, "r")) == 0) return 0;
837  n = hread(fp, buf, 16);
838  if ( hclose(fp) < 0 ) return -1;
839  if (n != 16) return 0;
840  return memcmp(g_magic, buf, 16) == 0? 1 : 0;
841 }
842 
843 int bgzf_getc(BGZF *fp)
844 {
845  int c;
846  if (fp->block_offset >= fp->block_length) {
847  if (bgzf_read_block(fp) != 0) return -2; /* error */
848  if (fp->block_length == 0) return -1; /* end-of-file */
849  }
850  c = ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
851  if (fp->block_offset == fp->block_length) {
852  fp->block_address = htell(fp->fp);
853  fp->block_offset = 0;
854  fp->block_length = 0;
855  }
856  fp->uncompressed_address++;
857  return c;
858 }
859 
860 #ifndef kroundup32
861 #define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
862 #endif
863 
864 int bgzf_getline(BGZF *fp, int delim, kstring_t *str)
865 {
866  int l, state = 0;
867  unsigned char *buf = (unsigned char*)fp->uncompressed_block;
868  str->l = 0;
869  do {
870  if (fp->block_offset >= fp->block_length) {
871  if (bgzf_read_block(fp) != 0) { state = -2; break; }
872  if (fp->block_length == 0) { state = -1; break; }
873  }
874  for (l = fp->block_offset; l < fp->block_length && buf[l] != delim; ++l);
875  if (l < fp->block_length) state = 1;
876  l -= fp->block_offset;
877  if (str->l + l + 1 >= str->m) {
878  str->m = str->l + l + 2;
879  kroundup32(str->m);
880  str->s = (char*)realloc(str->s, str->m);
881  }
882  memcpy(str->s + str->l, buf + fp->block_offset, l);
883  str->l += l;
884  fp->block_offset += l + 1;
885  if (fp->block_offset >= fp->block_length) {
886  fp->block_address = htell(fp->fp);
887  fp->block_offset = 0;
888  fp->block_length = 0;
889  }
890  } while (state == 0);
891  if (str->l == 0 && state < 0) return state;
892  fp->uncompressed_address += str->l;
893  str->s[str->l] = 0;
894  return str->l;
895 }
896 
898 {
899  if ( !fp->idx ) return;
900  free(fp->idx->offs);
901  free(fp->idx);
902  fp->idx = NULL;
903  fp->idx_build_otf = 0;
904 }
905 
907 {
908  bgzf_index_destroy(fp);
909  fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
910  if ( !fp->idx ) return -1;
911  fp->idx_build_otf = 1; // build index on the fly
912  return 0;
913 }
914 
916 {
917  fp->idx->noffs++;
918  if ( fp->idx->noffs > fp->idx->moffs )
919  {
920  fp->idx->moffs = fp->idx->noffs;
921  kroundup32(fp->idx->moffs);
922  fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
923  if ( !fp->idx->offs ) return -1;
924  }
925  fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->ublock_addr;
926  fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->block_address;
927  return 0;
928 }
929 
930 int bgzf_index_dump(BGZF *fp, const char *bname, const char *suffix)
931 {
932  if (bgzf_flush(fp) != 0) return -1;
933 
934  assert(fp->idx);
935  char *tmp = NULL;
936  if ( suffix )
937  {
938  int blen = strlen(bname);
939  int slen = strlen(suffix);
940  tmp = (char*) malloc(blen + slen + 1);
941  if ( !tmp ) return -1;
942  memcpy(tmp,bname,blen);
943  memcpy(tmp+blen,suffix,slen+1);
944  }
945 
946  FILE *idx = fopen(tmp?tmp:bname,"wb");
947  if ( tmp ) free(tmp);
948  if ( !idx ) return -1;
949 
950  // Note that the index contains one extra record when indexing files opened
951  // for reading. The terminating record is not present when opened for writing.
952  // This is not a bug.
953 
954  int i;
955  if ( fp->is_be )
956  {
957  uint64_t x = fp->idx->noffs - 1;
958  fwrite(ed_swap_8p(&x), 1, sizeof(x), idx);
959  for (i=1; i<fp->idx->noffs; i++)
960  {
961  x = fp->idx->offs[i].caddr; fwrite(ed_swap_8p(&x), 1, sizeof(x), idx);
962  x = fp->idx->offs[i].uaddr; fwrite(ed_swap_8p(&x), 1, sizeof(x), idx);
963  }
964  }
965  else
966  {
967  uint64_t x = fp->idx->noffs - 1;
968  fwrite(&x, 1, sizeof(x), idx);
969  for (i=1; i<fp->idx->noffs; i++)
970  {
971  fwrite(&fp->idx->offs[i].caddr, 1, sizeof(fp->idx->offs[i].caddr), idx);
972  fwrite(&fp->idx->offs[i].uaddr, 1, sizeof(fp->idx->offs[i].uaddr), idx);
973  }
974  }
975  fclose(idx);
976  return 0;
977 }
978 
979 
980 int bgzf_index_load(BGZF *fp, const char *bname, const char *suffix)
981 {
982  char *tmp = NULL;
983  if ( suffix )
984  {
985  int blen = strlen(bname);
986  int slen = strlen(suffix);
987  tmp = (char*) malloc(blen + slen + 1);
988  if ( !tmp ) return -1;
989  memcpy(tmp,bname,blen);
990  memcpy(tmp+blen,suffix,slen+1);
991  }
992 
993  FILE *idx = fopen(tmp?tmp:bname,"rb");
994  if ( tmp ) free(tmp);
995  if ( !idx ) return -1;
996 
997  fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
998  uint64_t x;
999  if ( fread(&x, 1, sizeof(x), idx) != sizeof(x) ) return -1;
1000 
1001  fp->idx->noffs = fp->idx->moffs = 1 + (fp->is_be ? ed_swap_8(x) : x);
1002  fp->idx->offs = (bgzidx1_t*) malloc(fp->idx->moffs*sizeof(bgzidx1_t));
1003  fp->idx->offs[0].caddr = fp->idx->offs[0].uaddr = 0;
1004 
1005  int i;
1006  if ( fp->is_be )
1007  {
1008  int ret = 0;
1009  for (i=1; i<fp->idx->noffs; i++)
1010  {
1011  ret += fread(&x, 1, sizeof(x), idx); fp->idx->offs[i].caddr = ed_swap_8(x);
1012  ret += fread(&x, 1, sizeof(x), idx); fp->idx->offs[i].uaddr = ed_swap_8(x);
1013  }
1014  if ( ret != sizeof(x)*2*(fp->idx->noffs-1) ) return -1;
1015  }
1016  else
1017  {
1018  int ret = 0;
1019  for (i=1; i<fp->idx->noffs; i++)
1020  {
1021  ret += fread(&x, 1, sizeof(x), idx); fp->idx->offs[i].caddr = x;
1022  ret += fread(&x, 1, sizeof(x), idx); fp->idx->offs[i].uaddr = x;
1023  }
1024  if ( ret != sizeof(x)*2*(fp->idx->noffs-1) ) return -1;
1025  }
1026  fclose(idx);
1027  return 0;
1028 
1029 }
1030 
1031 int bgzf_useek(BGZF *fp, long uoffset, int where)
1032 {
1033  if ( !fp->is_compressed )
1034  {
1035  if (hseek(fp->fp, uoffset, SEEK_SET) < 0)
1036  {
1037  fp->errcode |= BGZF_ERR_IO;
1038  return -1;
1039  }
1040  fp->block_length = 0; // indicates current block has not been loaded
1041  fp->block_address = uoffset;
1042  fp->block_offset = 0;
1043  bgzf_read_block(fp);
1044  fp->uncompressed_address = uoffset;
1045  return 0;
1046  }
1047 
1048  if ( !fp->idx )
1049  {
1050  fp->errcode |= BGZF_ERR_IO;
1051  return -1;
1052  }
1053 
1054  // binary search
1055  int ilo = 0, ihi = fp->idx->noffs - 1;
1056  while ( ilo<=ihi )
1057  {
1058  int i = (ilo+ihi)*0.5;
1059  if ( uoffset < fp->idx->offs[i].uaddr ) ihi = i - 1;
1060  else if ( uoffset >= fp->idx->offs[i].uaddr ) ilo = i + 1;
1061  else break;
1062  }
1063  int i = ilo-1;
1064  if (hseek(fp->fp, fp->idx->offs[i].caddr, SEEK_SET) < 0)
1065  {
1066  fp->errcode |= BGZF_ERR_IO;
1067  return -1;
1068  }
1069  fp->block_length = 0; // indicates current block has not been loaded
1070  fp->block_address = fp->idx->offs[i].caddr;
1071  fp->block_offset = 0;
1072  if ( bgzf_read_block(fp) < 0 ) return -1;
1073  if ( uoffset - fp->idx->offs[i].uaddr > 0 )
1074  {
1075  fp->block_offset = uoffset - fp->idx->offs[i].uaddr;
1076  assert( fp->block_offset <= fp->block_length ); // todo: skipped, unindexed, blocks
1077  }
1078  fp->uncompressed_address = uoffset;
1079  return 0;
1080 }
1081 
1082 long bgzf_utell(BGZF *fp)
1083 {
1084  return fp->uncompressed_address; // currently maintained only when reading
1085 }
1086