NVBIO
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
output_bam.cpp
Go to the documentation of this file.
1 /*
2  * nvbio
3  * Copyright (c) 2011-2014, NVIDIA CORPORATION. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of the NVIDIA CORPORATION nor the
13  * names of its contributors may be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
30 #include <nvbio/basic/numbers.h>
31 #include <nvbio/basic/omp.h>
32 
33 #include <stdio.h>
34 #include <stdarg.h>
35 
36 namespace nvbio {
37 namespace io {
38 
39 BamOutput::BamOutput(const char *file_name, AlignmentType alignment_type, BNT bnt)
40  : OutputFile(file_name, alignment_type, bnt)
41 {
42  fp = fopen(file_name, "wt");
43  if (fp == NULL)
44  {
45  log_error(stderr, "BamOutput: could not open %s for writing\n", file_name);
46  return;
47  }
48 
49  // set a 256kb output buffer on fp and make sure it's not line buffered
50  // this makes sure small fwrites do not land on disk straight away
51  // (256kb was chosen based on the default stripe size for Linux mdraid RAID-5 volumes)
52  setvbuf(fp, NULL, _IOFBF, 256 * 1024);
53 
54  buffer_id = 0;
55 }
56 
58 {
59  if (fp)
60  {
61  fclose(fp);
62  fp = NULL;
63  }
64 }
65 
66 uint32 BamOutput::generate_cigar(struct BAM_alignment& alnh,
67  struct BAM_alignment_data_block& alnd,
68  const AlignmentData& alignment)
69 {
70  uint32 *output = alnd.cigar;
71  uint32 read_len = 0;
72 
73  for(uint32 i = 0; i < alignment.cigar_len; i++)
74  {
75  const Cigar& cigar_entry = alignment.cigar[alignment.cigar_len - i - 1u];
76  // convert our "MIDS" -> { 0, 1, 2, 3 } into BAM's "MIDS" -> {0, 1, 2, 4} encoding
77  const uint8 cigar_op = "\0\1\2\4"[cigar_entry.m_type];
78  const uint32 cigar_op_len = cigar_entry.m_len;
79 
80  output[i] = cigar_op_len << 4 | cigar_op;
81 
82  // check that we didn't overflow the CIGAR buffer
83  assert((unsigned long)(&output[i] - alnd.cigar) < (unsigned long) (sizeof(alnd.cigar) * sizeof(alnd.cigar[0]) - 1));
84 
85  // keep track of number of BPs in the original read
86  if (cigar_op != 2) // 2 == 'D'
87  read_len += cigar_entry.m_len;
88  }
89 
90  // output n_cigar_op
91  alnh.flag_nc = (alnh.flag_nc & 0xffff0000) | (alignment.cigar_len & 0xffff);
92 
93  return read_len;
94 }
95 
96 namespace {
97 // utility function to convert an int to a base-10 string representation
98 template <typename T> int itoa(char *buf, T in)
99 {
100  int len = 0;
101  bool negative = false;
102 
103  // track the sign
104  if (in < 0)
105  {
106  negative = true;
107  in = -in;
108  }
109 
110  // convert to base10
111  do
112  {
113  buf[len] = "0123456789"[in % 10];
114  in /= 10;
115  len++;
116  } while(in);
117 
118  // add sign
119  if (negative)
120  {
121  buf[len] = '-';
122  len++;
123  }
124 
125  // reverse
126  for(int c = 0; c < len / 2; c++)
127  {
128  char tmp;
129  tmp = buf[c];
130  buf[c] = buf[len - c - 1];
131  buf[len - c - 1] = tmp;
132  }
133 
134  // terminate
135  buf[len] = 0;
136  return len;
137 }
138 }
139 
140 // generate the MD string
141 uint32 BamOutput::generate_md_string(BAM_alignment& alnh, BAM_alignment_data_block& alnd, const AlignmentData& alignment)
142 {
143  if (alignment.mds_vec == NULL)
144  {
145  log_warning(stderr, " BAM: alignment %u from read %u has an empty MD string\n", alignment.aln_id, alignment.read_id);
146  alnd.md_string[0] = '\0';
147  return 0;
148  }
149  const uint32 mds_len = uint32(alignment.mds_vec[0]) | (uint32(alignment.mds_vec[1]) << 8);
150  char *buffer = alnd.md_string;
151  uint32 buffer_len = 0;
152 
153  uint32 i;
154 
155  alnd.mm = 0;
156  alnd.gapo = 0;
157  alnd.gape = 0;
158 
159  i = 2;
160  do
161  {
162  const uint8 op = alignment.mds_vec[i++];
163  switch (op)
164  {
165  case MDS_MATCH:
166  {
167  uint8 l = alignment.mds_vec[i++];
168 
169  // prolong the MDS match if it spans multiple tokens
170  while (i < mds_len && alignment.mds_vec[i] == MDS_MATCH)
171  l += alignment.mds_vec[i++];
172 
173  buffer_len += itoa(buffer + buffer_len, l);
174  }
175 
176  break;
177 
178  case MDS_MISMATCH:
179  {
180  const char c = dna_to_char(alignment.mds_vec[i++]);
181  buffer[buffer_len++] = c;
182 
183  alnd.mm++;
184  }
185 
186  break;
187 
188  case MDS_INSERTION:
189  {
190  const uint8 l = alignment.mds_vec[i++];
191  i += l;
192 
193  alnd.gapo++;
194  alnd.gape += l - 1;
195  }
196 
197  break;
198 
199  case MDS_DELETION:
200  {
201  const uint8 l = alignment.mds_vec[i++];
202  buffer[buffer_len++] = '^';
203  for(uint8 n = 0; n < l; n++)
204  {
205  buffer[buffer_len++] = dna_to_char(alignment.mds_vec[i++]);
206  }
207 
208  buffer[buffer_len++] = '0';
209 
210  alnd.gapo++;
211  alnd.gape += l - 1;
212  }
213 
214  break;
215  }
216  } while(i < mds_len);
217 
218  buffer[buffer_len] = '\0';
219  return buffer_len;
220 }
221 
222 // convert our ACGT = {0, 1, 2, 3} into BAM's ACGT = {1, 2, 4, 8} encoding
223 uint8 BamOutput::encode_bp(uint8 bp)
224 {
225  if (bp > 3)
226  {
227  // unknown BP, map to N
228  return 15;
229  } else {
230  return 1 << bp;
231  }
232 }
233 
234 uint32 BamOutput::process_one_alignment(AlignmentData& alignment, AlignmentData& mate)
235 {
236  // BAM alignment header
237  struct BAM_alignment alnh;
238  // data block with actual alignment info
239  struct BAM_alignment_data_block alnd;
240 
241  uint8 mapq;
242 
243  // xxxnsubtil: this is probably not needed
244  memset(&alnh, 0, sizeof(alnh));
245  memset(&alnd, 0, sizeof(alnd));
246 
247  const uint32 ref_cigar_len = reference_cigar_length(alignment.cigar, alignment.cigar_len);
248 
249  // setup alignment information
250  const uint32 seq_index = uint32(std::upper_bound(
253  alignment.cigar_pos ) - bnt.sequence_index) - 1u;
254 
255  // fill out read name and length
256  alnd.name = alignment.read_name;
257  alnh.bin_mq_nl = (uint8)(strlen(alnd.name) + 1);
258 
259  // fill out read data
260  // (PackedStream is not used here to avoid doing a read-modify-write on every BP)
261  {
262  for(uint32 i = 0; i < alignment.read_len; i += 2)
263  {
264  uint8 out_bp;
265  uint8 s;
266 
267  if (alignment.aln->m_rc)
268  {
269  nvbio::complement_functor<4> complement;
270 
271  s = complement(alignment.read_data[i]);
272  s = encode_bp(s);
273  out_bp = s << 4;
274 
275  if (i + 1 < alignment.read_len)
276  {
277  s = complement(alignment.read_data[(i + 1)]);
278  s = encode_bp(s);
279  out_bp |= s;
280  }
281  } else {
282  s = alignment.read_data[alignment.read_len - i - 1];
283  s = encode_bp(s);
284  out_bp = s << 4;
285 
286  if (i + 1 < alignment.read_len)
287  {
288  s = alignment.read_data[alignment.read_len - (i + 1) - 1];
289  s = encode_bp(s);
290  out_bp |= s;
291  }
292  }
293 
294  alnd.seq[i / 2] = out_bp;
295  }
296 
297  alnh.l_seq = alignment.read_len;
298  }
299 
300  // fill out quality data
301  for(uint32 i = 0; i < alignment.read_len; i++)
302  {
303  char q;
304 
305  if (alignment.aln->m_rc)
306  q = alignment.qual[i];
307  else
308  q = alignment.qual[alignment.read_len - i - 1];
309 
310  alnd.qual[i] = q;
311  }
312 
313  // compute mapping quality
314  mapq = alignment.mapq;
315 
316  // check if we're mapped
317  if (alignment.aln->is_aligned() == false || mapq < mapq_filter)
318  {
319  alnh.refID = -1;
320  alnh.pos = -1;
321  alnh.flag_nc = BAM_FLAGS_UNMAPPED;
322  alnh.next_refID = -1;
323  alnh.next_pos = -1;
324  // mark the md string as empty
325  alnd.md_string[0] = '\0';
326 
327  // unaligned reads don't need anything else; output and return
328  output_alignment(alnh, alnd);
329  return 0;
330  }
331 
332  // compute alignment flags
333  alnh.flag_nc = (alignment.aln->mate() ? BAM_FLAGS_READ_2 : BAM_FLAGS_READ_1);
334  if (alignment.aln->m_rc)
335  alnh.flag_nc |= BAM_FLAGS_REVERSE;
336 
337  if (alignment_type == PAIRED_END)
338  {
339  alnh.flag_nc |= BAM_FLAGS_PAIRED;
340 
341  if (mate.aln->is_concordant())
342  alnh.flag_nc |= BAM_FLAGS_PROPER_PAIR;
343 
344  if (!mate.aln->is_aligned())
345  alnh.flag_nc |= BAM_FLAGS_MATE_UNMAPPED;
346 
347  if (mate.aln->is_rc())
348  alnh.flag_nc |= BAM_FLAGS_MATE_REVERSE;
349  }
350 
351  if (alignment.cigar_pos + ref_cigar_len > bnt.sequence_index[ seq_index+1 ])
352  {
353  // flag UNMAP as this alignment bridges two adjacent reference sequences
354  // xxxnsubtil: we still output the rest of the alignment data, does that make sense?
355  alnh.flag_nc |= BAM_FLAGS_UNMAPPED;
356 
357  // make this look like a real unmapped alignment
358  alnh.refID = -1;
359  alnh.pos = -1;
360  alnh.next_refID = -1;
361  alnh.next_pos = -1;
362  alnd.md_string[0] = '\0';
363 
364  output_alignment(alnh, alnd);
365  return 0;
366  }
367 
368  // fill out alignment reference ID and position
369  alnh.refID = seq_index;
370  alnh.pos = uint32(alignment.cigar_pos - bnt.sequence_index[ seq_index ]);
371 
372  // write out mapq
373  alnh.bin_mq_nl |= (mapq << 8);
374  // BAM alignment bin is always 0
375  // xxxnsubtil: is the bin useful?
376 
377  // fill out the cigar string...
378  uint32 computed_cigar_len = generate_cigar(alnh, alnd, alignment);
379  // ... and make sure it makes (some) sense
380  if (computed_cigar_len != alignment.read_len)
381  {
382  log_error(stderr, "BAM output : cigar length doesn't match read %u (%u != %u)\n",
383  alignment.read_id /* xxxnsubtil: global_read_id */,
384  computed_cigar_len, alignment.read_len);
385  return mapq;
386  }
387 
388  if (alignment_type == PAIRED_END)
389  {
390  if (mate.aln->is_aligned())
391  {
392  const uint32 o_ref_cigar_len = reference_cigar_length(mate.cigar, mate.cigar_len);
393 
394  // setup alignment information for the opposite mate
395  const uint32 o_seq_index = uint32(std::upper_bound(
398  mate.cigar_pos ) - bnt.sequence_index) - 1u;
399 
400  alnh.next_refID = uint32(o_seq_index - seq_index);
401  // next_pos here is equivalent to SAM's PNEXT,
402  // but it's zero-based in BAM and one-based in SAM
403  alnh.next_pos = int32( mate.cigar_pos - bnt.sequence_index[ o_seq_index ] );
404 
405  if (o_seq_index != seq_index)
406  alnh.tlen = 0;
407  else
408  {
409  alnh.tlen = nvbio::max(mate.cigar_pos + o_ref_cigar_len,
410  alignment.cigar_pos + ref_cigar_len) -
411  nvbio::min(mate.cigar_pos, alignment.cigar_pos);
412 
413  if (mate.cigar_pos < alignment.cigar_pos)
414  alnh.tlen = -alnh.tlen;
415  }
416  }
417  else
418  {
419  // other mate is unmapped
420  // xxxnsubtil: this follows the same convention that was documented in the old code for SAM,
421  // except that BAM does not have an encoding for '=' here
422  // it's somewhat unclear whether this is correct
423  alnh.next_refID = alnh.refID;
424  alnh.next_pos = int32( alignment.cigar_pos - bnt.sequence_index[ seq_index ] );
425  // xxx: check whether this is really correct
426  alnh.tlen = 0;
427  }
428  } else {
429  alnh.next_refID = -1;
430  alnh.next_pos = -1;
431  alnh.tlen = 0;
432  }
433 
434  // fill out tag data
435  alnd.ed = alignment.aln->ed();
436  alnd.score = alignment.aln->score();
437 
438  alnd.second_score_valid = false; // TODO!
439 
440  generate_md_string(alnh, alnd, alignment);
441 
442  // write out the alignment
443  output_alignment(alnh, alnd);
444 
445  return mapq;
446 }
447 
448 void BamOutput::output_tag_uint32(DataBuffer& out, const char *tag, uint32 val)
449 {
450  out.append_data(tag, 2);
451  out.append_uint8('i');
452  out.append_uint32(val);
453 }
454 
455 void BamOutput::output_tag_uint8(DataBuffer& out, const char *tag, uint8 val)
456 {
457  out.append_data(tag, 2);
458  out.append_uint8('c');
459  out.append_uint8(val);
460 }
461 
462 void BamOutput::output_tag_string(DataBuffer& out, const char *tag, const char *val)
463 {
464  out.append_data(tag, 2);
465  out.append_uint8('Z');
466  out.append_string(val);
467  out.append_uint8('\0');
468 }
469 
470 void BamOutput::output_alignment(BAM_alignment& alnh, BAM_alignment_data_block& alnd)
471 {
472  DataBuffer& out = data_buffers[ buffer_id ];
473 
474  // keep track of the block size offset so we can compute the block size and update it later
475  uint32 off_block_size = out.get_pos();
476  out.skip_ahead(sizeof(alnh.block_size));
477 
478  out.append_int32(alnh.refID);
479  out.append_int32(alnh.pos);
480  out.append_int32(alnh.bin_mq_nl);
481  out.append_int32(alnh.flag_nc);
482  out.append_int32(alnh.l_seq);
483  out.append_int32(alnh.next_refID);
484  out.append_int32(alnh.next_pos);
485  out.append_int32(alnh.tlen);
486 
487  out.append_string(alnd.name);
488  out.append_uint8('\0');
489 
490  // cigar_len = n_cigar_op (lower 16-bits of flag_nc) * sizeof(uint32)
491  const uint32 cigar_len = (alnh.flag_nc & 0xffff) * sizeof(uint32);
492  out.append_data(alnd.cigar, cigar_len);
493 
494  out.append_data(alnd.seq, (alnh.l_seq + 1) / 2);
495  out.append_data(alnd.qual, alnh.l_seq);
496 
497  // unmapped alignments don't get auxiliary data
498  if (!(alnh.flag_nc & BAM_FLAGS_UNMAPPED))
499  {
500  output_tag_uint8(out, "NM", alnd.ed);
501  output_tag_uint32(out, "AS", alnd.score);
502 
503  if (alnd.second_score_valid)
504  output_tag_uint32(out, "XS", alnd.second_score);
505 
506  output_tag_uint8(out, "XM", alnd.mm);
507  output_tag_uint8(out, "XO", alnd.gapo);
508  output_tag_uint8(out, "XG", alnd.gape);
509  if (alnd.md_string[0])
510  output_tag_string(out, "MD", alnd.md_string);
511  }
512 
513  // compute alignment data block size and write it out
514  const int32 aln_len = out.get_pos() - off_block_size - sizeof(alnh.block_size);
515  out.poke_int32(off_block_size, aln_len);
516 
517  if (out.is_full())
518  write_block();
519 }
520 
522 {
523  // protect this section
524  ScopedLock lock( &mutex );
525 
526  float time = 0.0f;
527  {
528  ScopedTimer<float> timer( &time );
529 
530  for(uint32 c = 0; c < batch.count; c++)
531  {
532  AlignmentData alignment = get(batch, c);
534 
535  process_one_alignment(alignment, mate);
536  }
537 
538  // flush at the end of each batch
539  //if (data_buffers[ buffer_id ].get_pos())
540  // flush_blocks();
541  }
542  iostats.n_reads += batch.count;
543  iostats.output_process_timings.add( batch.count, time );
544 }
545 
547 {
548  // protect this section
549  ScopedLock lock( &mutex );
550 
551  float time = 0.0f;
552  {
553  ScopedTimer<float> timer( &time );
554 
555  for(uint32 c = 0; c < batch.count; c++)
556  {
557  AlignmentData alignment = get_anchor_mate(batch,c);
558  AlignmentData mate = get_opposite_mate(batch,c);
559 
560  process_one_alignment(alignment, mate);
561  process_one_alignment(mate, alignment);
562  }
563 
564  // flush at the end of each batch
565  //if (data_buffers[ buffer_id ].get_pos())
566  // flush_blocks();
567  }
568  iostats.n_reads += batch.count;
569  iostats.output_process_timings.add( batch.count, time );
570 }
571 
572 void BamOutput::write_block()
573 {
574  ++buffer_id;
575 
576  // check whether we need to flush the blocks
577  if (buffer_id == BUFFERS)
578  flush_blocks();
579 }
580 
581 void BamOutput::flush_blocks()
582 {
583  #pragma omp parallel for
584  for (int32 i = 0; i < buffer_id; ++i)
585  {
586  bgzf[i].start_block( compressed_buffers[i] );
587  bgzf[i].compress( compressed_buffers[i], data_buffers[i] );
588  bgzf[i].end_block( compressed_buffers[i] );
589 
590  data_buffers[i].rewind();
591  }
592 
593  // write out the compressed buffers, in order
594  for (int32 i = 0; i < buffer_id; ++i)
595  {
596  // write the compressed buffer
597  fwrite( compressed_buffers[i].get_base_ptr(), compressed_buffers[i].get_pos(), 1, fp );
598 
599  // and rewind it
600  compressed_buffers[i].rewind();
601  }
602 
603  // reset the buffer id and the work-queue counter
604  buffer_id = 0;
605 }
606 
607 void BamOutput::output_header(void)
608 {
609  int pos_l_text, pos_start_header, header_len;
610 
611  // names in parenthesis refer to the field names in the BAM spec
612  DataBuffer& data_buffer = data_buffers[ buffer_id ];
613 
614  // write magic string (magic)
615  data_buffer.append_string("BAM\1");
616  // skip ahead header length field (l_text), will fill later
617  pos_l_text = data_buffer.get_pos();
618  data_buffer.skip_ahead(sizeof(uint32));
619 
620  pos_start_header = data_buffer.get_pos();
621 
622  // fill out SAM header (text)
623  data_buffer.append_string("@HD\t");
624  data_buffer.append_string("VN:1.3\n");
625  if (!rg_id.empty())
626  {
627  // write the RG:ID
628  data_buffer.append_string("@RG");
629  data_buffer.append_string("\tID:");
630  data_buffer.append_string(rg_id.c_str());
631 
632  // write the other RG tags
633  if (!rg_string.empty())
634  data_buffer.append_string( rg_string.c_str() );
635 
636  data_buffer.append_string("\n");
637  }
638  data_buffer.append_string("@PG");
639  data_buffer.append_string("\tID:");
640  data_buffer.append_string(pg_id.c_str());
641  data_buffer.append_string("\tPN:");
642  data_buffer.append_string(pg_name.c_str());
643  // VN was bumped to 0.5.1 to distinguish between the new and old output code
644  data_buffer.append_string("\tVN:");
645  data_buffer.append_string(pg_version.c_str());
646  data_buffer.append_string("\n");
647  // samtools does not cope with a null terminator here, so don't write one out
648 // data_buffer.append_int8(0);
649 
650  // compute and write out the size of the SAM header (l_text)
651  header_len = data_buffer.get_pos() - pos_start_header;
652  data_buffer.poke_int32(pos_l_text, header_len);
653 
654  // output the number of reference sequences (n_ref)...
655  data_buffer.append_int32(bnt.n_seqs);
656  // ... and the information for each
657  for (uint32 i = 0; i < bnt.n_seqs; i++)
658  {
659  const char *name = bnt.names + bnt.names_index[i];
660 
661  // sequence name length including null terminator (l_name)
662  data_buffer.append_int32( int32( strlen(name) + 1 ) );
663  // write out sequence name string and null-terminator (name)
664  data_buffer.append_string(name);
665  data_buffer.append_int8(0);
666  // sequence length (l_ref)
667  data_buffer.append_int32(bnt.sequence_index[i+1] - bnt.sequence_index[i]);
668  }
669 
670  // compress and write out the header block separately
671  // (this yields a slightly smaller file)
672  write_block();
673 }
674 
676 {
677  // protect this section
678  ScopedLock lock( &mutex );
679 
680  // flush all non-emtpy blocks
681  if (data_buffers[ buffer_id ].get_pos())
682  write_block();
683 
684  flush_blocks();
685 
686  NVBIO_CUDA_ASSERT(fp);
687 
688  // write out the BAM EOF marker
689  static const unsigned char magic[28] = { 0037, 0213, 0010, 0004, 0000, 0000, 0000, 0000, 0000,
690  0377, 0006, 0000, 0102, 0103, 0002, 0000, 0033, 0000,
691  0003, 0000, 0000, 0000, 0000, 0000, 0000, 0000, 0000, 0000 };
692 
693  fwrite(magic, sizeof(magic), 1, fp);
694 
695  fclose(fp);
696  fp = NULL;
697 }
698 
699 } // namespace io
700 } // namespace nvbio