NVBIO
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
aligner_all.h
Go to the documentation of this file.
1 /*
2  * nvbio
3  * Copyright (c) 2011-2014, NVIDIA CORPORATION. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of the NVIDIA CORPORATION nor the
13  * names of its contributors may be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include <nvbio/basic/cuda/ldg.h>
29 #include <nvbio/basic/primitives.h>
36 
40 
41 #include <thrust/iterator/transform_iterator.h>
42 #include <thrust/adjacent_difference.h>
43 
44 namespace nvbio {
45 namespace bowtie2 {
46 namespace cuda {
47 
48 template <typename scoring_tag>
50  const Params& params,
51  const fmi_type fmi,
52  const rfmi_type rfmi,
53  const UberScoringScheme& input_scoring_scheme,
54  const io::SequenceDataDevice& reference_data,
55  const io::FMIndexDataDevice& driver_data,
56  const io::SequenceDataDevice& read_data,
57  io::HostOutputBatchSE& cpu_batch,
58  Stats& stats)
59 {
60  // prepare the scoring system
61  typedef typename ScoringSchemeSelector<scoring_tag>::type scoring_scheme_type;
62  typedef typename scoring_scheme_type::threshold_score_type threshold_score_type;
63 
64  scoring_scheme_type scoring_scheme = ScoringSchemeSelector<scoring_tag>::scheme( input_scoring_scheme );
65 
66  // cast the reads to use proper iterators
67  const read_batch_type reads( plain_view( read_data ) );
68 
69  Timer timer;
70  Timer global_timer;
71 
72  const uint32 count = read_data.size();
73  //const uint32 band_len = band_length( params.max_dist );
74 
75  seed_queues.in_size = count;
76 
77  thrust::copy( // this is only done to pass a list of tasks to the
78  thrust::make_counting_iterator(0u), // score_all method
79  thrust::make_counting_iterator(0u) + count,
80  seed_queues.in_queue.begin() );
81 
82  // clear mapq's
83  thrust::fill( mapq_dvec.begin(), mapq_dvec.end(), uint8(255) );
84 
85  //
86  // Unlike Bowtie2, in the context of all-mapping we perform a single seed & extension pass.
87  // However, during seed mapping, if we find too many hits for a given seeding pattern,
88  // we try another one with shifted seeds.
89  //
90 
91  uint64 n_alignments = 0;
92 
93  uint32 max_seeds = 0u;
94  for (uint32 s = read_data.min_sequence_len(); s <= read_data.max_sequence_len(); ++s)
95  max_seeds = nvbio::max( max_seeds, uint32( s / params.seed_freq(s) ) );
96 
97 #if 0
98  for (uint32 seed = 0; seed < max_seeds; ++seed)
99  {
100  // initialize the seed hit counts
102 
103  log_debug(stderr, " map seed %u\n", seed);
104 
105  //
106  // perform mapping
107  //
109 
110  if (params.allow_sub == 0)
111  {
112  timer.start();
113 
114  map_exact(
115  reads, fmi, rfmi,
116  hits,
117  make_uint2( seed, seed+1 ),
118  params,
119  params.fw,
120  params.rc );
121 
123  nvbio::cuda::check_error("mapping kernel");
124 
125  timer.stop();
126  stats.map.add( count, timer.seconds() );
127  }
128  else
129  {
130  timer.start();
131 
132  map_approx(
133  reads, fmi, rfmi,
134  hits,
135  make_uint2( seed, seed+1 ),
136  params,
137  params.fw,
138  params.rc );
139 
141  nvbio::cuda::check_error("mapping kernel");
142 
143  timer.stop();
144  stats.map.add( count, timer.seconds() );
145  }
146 
147  // take some stats on the hits we got
148  if (params.keep_stats)
149  keep_stats( reads.size(), stats );
150 
151  log_debug(stderr, " score\n");
152  score_all(
153  params,
154  fmi,
155  rfmi,
156  input_scoring_scheme,
157  scoring_scheme,
158  reference_data,
159  driver_data,
160  read_data,
161  count,
163  stats,
164  n_alignments );
165  }
166 #else
167  // initialize the seed hit counts
169 
170  //
171  // perform mapping
172  //
173  log_debug(stderr, " map (%u seeds)\n", max_seeds);
174  {
176 
177  if (params.allow_sub == 0)
178  {
179  timer.start();
180 
181  map_exact(
182  reads, fmi, rfmi,
183  hits,
184  make_uint2( 0u, max_seeds ),
185  params,
186  params.fw,
187  params.rc );
188 
190  nvbio::cuda::check_error("mapping kernel");
191 
192  timer.stop();
193  stats.map.add( count, timer.seconds() );
194  }
195  else
196  {
197  timer.start();
198 
199  map_approx(
200  reads, fmi, rfmi,
201  hits,
202  make_uint2( 0u, max_seeds ),
203  params,
204  params.fw,
205  params.rc );
206 
208  nvbio::cuda::check_error("mapping kernel");
209 
210  timer.stop();
211  stats.map.add( count, timer.seconds() );
212  }
213 
214  // take some stats on the hits we got
215  if (params.keep_stats)
216  keep_stats( reads.size(), stats );
217  }
218 
219  log_debug(stderr, " score\n");
220  score_all(
221  params,
222  fmi,
223  rfmi,
224  input_scoring_scheme,
225  scoring_scheme,
226  reference_data,
227  driver_data,
228  read_data,
229  cpu_batch,
230  count,
232  stats,
233  n_alignments );
234 #endif
235 }
236 
237 // a simple functor to merge the read-id, RC flag and location into a single 64-bit sorting key
238 //
240 {
243 
244  SortingKeys(const HitQueuesDeviceView _hits) : hits( _hits ) {}
245 
247  uint64 operator() (const uint32 i) const
248  {
249  const uint64 loc = hits.loc[i];
250  const uint64 read_id = hits.read_id[i];
251  const uint64 rc = hits.seed[i].rc;
252 
253  return loc + (read_id << 33) + (rc << 32);
254  }
255 
257 };
258 
259 // a pseudo-iterator to evaluate the predicate (it[i] <= it[i-1]) for arbitrary iterators
260 //
261 template <typename Iterator>
263 {
265  typedef bool result_type;
266 
267  // constructor
269  difference_transform(const Iterator _it) : it( _it ) {}
270 
272  bool operator() (const uint32 i) const { return it[i] != it[i-1]; }
273 
274  Iterator it;
275 };
276 
277 template <typename scoring_scheme_type>
279  const Params& params,
280  const fmi_type fmi,
281  const rfmi_type rfmi,
282  const UberScoringScheme& input_scoring_scheme,
283  const scoring_scheme_type& scoring_scheme,
284  const io::SequenceDataDevice& reference_data,
285  const io::FMIndexDataDevice& driver_data,
286  const io::SequenceDataDevice& read_data,
287  io::HostOutputBatchSE& cpu_batch,
288  const uint32 seed_queue_size,
289  const uint32* seed_queue,
290  Stats& stats,
291  uint64& total_alignments)
292 {
293  // prepare the scoring system
294  //typedef typename scoring_scheme_type::threshold_score_type threshold_score_type;
295 
296  //threshold_score_type threshold_score = scoring_scheme.threshold_score();
297  const int32 score_limit = scoring_scheme_type::worst_score;
298 
299  Timer timer;
300  Timer global_timer;
301 
302  const uint32 count = read_data.size();
303  const uint32 band_len = band_length( params.max_dist );
304 
305  // cast the reads to use proper iterators
306  const read_batch_type reads( plain_view( read_data ) );
307 
308  // cast the genome to use proper iterators
309  const io::LdgSequenceDataView genome_view( plain_view( reference_data ) );
310  const genome_access_type genome_access( genome_view );
311  const uint32 genome_len = genome_access.bps();
312  const genome_iterator genome = genome_access.sequence_stream();
313 
314  //
315  // At this point we have a queue full of reads, each with an associated set of
316  // seed hits encoded as a (sorted) list of SA ranges.
317  // For each read we need to:
318  // 1. select some seed hit to process (i.e. a row in one of the SA ranges)
319  // 2. locate it, i.e. converting from SA to linear coordinates
320  // 3. score it
321  // 4. if the score is acceptable, backtrack the extension
322  // The whole process is carried out as a pipeline composed by 4 different stages
323  // communicating through work queues.
324  // At the end of each pipeline iteration, the reads which are still alive, in the
325  // sense they contain more seed hits to process, are pushed to an output queue.
326  // The output queue is then reused in the next round as the input queue, and
327  // viceversa.
328  //
329  // NOTE: in all-mapping mode, we ultimately need to explore exahustively the set of seed
330  // hits for each read, and report all the alignments found.
331  // If we process the seed hits from all reads simultaneously, reporting all their
332  // alignments sorted by read would require an external sorting pass, as the amount of
333  // storage could be fairly dramatic.
334  // However, we can also simply process all the seed hits from the first read first,
335  // followed by all the ones from the second read, and so on.
336  // This has also the advantage of sharing the same read data across many seed extensions,
337  // lowering the overall memory traffic.
338  // In order to distribute work evenly we can:
339  // run a scan on the number of SA ranges from each read (A)
340  // run a scan on all the SA range sizes (B)
341  // assign a thread per hit, and find the corresponding SA range by a binary search in (B),
342  // followed by another binary search in (A) to find the corresponding read.
343  //
344 
345  // run a scan on the number of SA ranges from each read
346  thrust::inclusive_scan( hit_deques.counts().begin(), hit_deques.counts().begin() + count, hits_count_scan_dvec.begin() );
347 
348  const uint32 n_reads = reads.size();
349 
350  // gather all the range sizes, sorted by read, in a compacted array
351  const uint32 n_hit_ranges = hits_count_scan_dvec[ count-1 ];
352  if (n_hit_ranges == 0)
353  return;
354 
355  log_verbose(stderr, " ranges : %u\n", n_hit_ranges);
356 
358 
360  n_hit_ranges,
361  n_reads,
362  hits,
365 
367  nvbio::cuda::check_error("gather ranges kernel");
368 
369  // run a scan on all the SA range sizes
370  thrust::inclusive_scan( hits_range_scan_dvec.begin(), hits_range_scan_dvec.begin() + n_hit_ranges, hits_range_scan_dvec.begin() );
371 
372  const uint64 n_hits = hits_range_scan_dvec[ n_hit_ranges-1 ];
373  log_verbose(stderr, " hits : %llu\n", n_hits);
374 
375  #if defined(NVBIO_CUDA_DEBUG)
376  {
377  uint64 total_hits = 0;
378  thrust::host_vector<uint32> h_counts = hit_deques.counts();
379  thrust::host_vector<SeedHit> h_hits = hit_deques.hits();
380  thrust::host_vector<uint32> h_index = hit_deques.index();
381  for (uint32 i = 0; i < n_reads; ++i)
382  {
383  uint32 cnt = h_counts[i];
384  uint32 index = h_index[i];
385  for (uint32 j = 0; j < cnt; ++j)
386  {
387  const SeedHit hit = h_hits[ index + j ];
388  total_hits += hit.get_range().y - hit.get_range().x;
389  }
390  }
391  if (n_hits != total_hits)
392  {
393  log_error(stderr, " expected %llu hits, computed %llu\n", total_hits, n_hits);
394  throw nvbio::runtime_error("invalid number of hits");
395  }
396  }
397  #endif
398 
399  //
400  // Loop through batches of BATCH_SIZE seed hits. The seed hits are extracted
401  // from the compressed variable-sized list of SA ranges, located and scored.
402  // If the extension is successful, they are queued to a ring-buffer for
403  // back-tracking. The ring-buffer is twice as large as the batch size, so as
404  // to allow temporary overflows.
405  //
406 
407  uint32 buffer_offset = 0;
408  uint32 buffer_count = 0;
409 
410  typedef AllMappingPipelineState<scoring_scheme_type> pipeline_type;
411 
412  pipeline_type pipeline(
413  0u,
414  reads,
415  reads,
416  genome_len,
417  genome,
418  fmi,
419  rfmi,
420  scoring_scheme,
421  score_limit,
422  *this );
423 
424  // setup the hits queue according to whether we select multiple hits per read or not
426 
427  HitQueues& hit_queues = scoring_queues.hits;
428 
429 // uint32* score_idx_dptr = nvbio::device_view( hit_queues.ssa );
430 
431  for (uint64 hit_offset = 0; hit_offset < n_hits; hit_offset += BATCH_SIZE)
432  {
433  const uint32 hit_count = std::min( uint32(n_hits - hit_offset), BATCH_SIZE );
434 
435  timer.start();
436 
438 
439  log_debug(stderr, " select\n");
440  select_all(
441  hit_offset,
442  hit_count,
443  n_reads,
444  n_hit_ranges,
445  n_hits,
446  hits,
449  nvbio::device_view( hit_queues ) );
450 
452  nvbio::cuda::check_error("selecting kernel");
453 
454  timer.stop();
455  stats.select.add( hit_count, timer.seconds() );
456 
457  // update pipeline
458  pipeline.n_hits_per_read = 1u;
459  pipeline.hits_queue_size = hit_count;
460  pipeline.scoring_queues = scoring_queues.device_view();
461 
462  log_debug(stderr, " locate\n");
463  timer.start();
464 
465  pipeline.idx_queue = sort_hi_bits(
466  hit_count,
467  nvbio::device_view( hit_queues.loc ) );
468 
469  timer.stop();
470  stats.sort.add( hit_count, timer.seconds() );
471 
472  // and locate their position in linear coordinates
473  locate_init(
474  reads, fmi, rfmi,
475  hit_count,
476  pipeline.idx_queue,
477  nvbio::device_view( hit_queues ),
478  params );
479 
481  nvbio::cuda::check_error("locate-init kernel");
482 
484  reads, fmi, rfmi,
485  hit_count,
486  pipeline.idx_queue,
487  nvbio::device_view( hit_queues ),
488  params );
489 
491  nvbio::cuda::check_error("locate-lookup kernel");
492 
493  timer.stop();
494  stats.locate.add( hit_count, timer.seconds() );
495 
496  timer.start();
497 
498  // sort the selected hits by their linear genome coordinate
499  const SortingKeys make_keys( nvbio::device_view( hit_queues ) );
500 
501  std::pair<uint32*,uint64*> sorting_idx = sort_64_bits(
502  hit_count,
503  thrust::make_transform_iterator( thrust::make_counting_iterator<uint32>(0), make_keys ) );
504 
505  if (1)
506  {
507  log_debug(stderr, " dedup");
508  nvbio::transform<device_tag>(
509  hit_count-1u,
510  thrust::make_counting_iterator<uint32>(0) + 1u,
511  flags_dvec.begin() + 1u,
512  difference_transform<uint64*>( sorting_idx.second ) );
513 
515  nvbio::cuda::check_error("dedup kernel");
516 
517  flags_dvec[0] = 1u;
518 
519  // mark straddling seeds
521  hit_count,
522  pipeline.idx_queue,
523  genome_view.size(),
524  genome_view.sequence_index(),
525  pipeline.scoring_queues.hits,
526  flags_dptr,
527  params );
528 
530  nvbio::cuda::check_error("mark-straddling kernel");
531 
532  pipeline.idx_queue = (sorting_idx.first == idx_queue_dptr) ?
535 
536  // reset the hits queue size
537  pipeline.hits_queue_size = nvbio::copy_flagged(
538  hit_count,
539  sorting_idx.first,
540  flags_dptr,
541  pipeline.idx_queue,
542  temp_dvec );
543 
545  nvbio::cuda::check_error("copy-flagged kernel");
546 
547  log_debug_cont(stderr, " (%u/%u - %.2f)\n", pipeline.hits_queue_size, hit_count, float(pipeline.hits_queue_size)/float(hit_count));
548  }
549  else
550  pipeline.idx_queue = sorting_idx.first;
551 
552  timer.stop();
553  stats.sort.add( hit_count, timer.seconds() );
554 
555  //
556  // assign a score to all selected hits
557  //
558  timer.start();
559 
560  log_debug(stderr, " score\n");
561  const uint32 n_alignments = nvbio::bowtie2::cuda::score_all(
562  band_len,
563  pipeline,
564  params,
565  buffer_offset + buffer_count,
566  BATCH_SIZE*2 );
567 
569  nvbio::cuda::check_error("score kernel");
570 
571  timer.stop();
572  stats.score.add( hit_count, timer.seconds() );
573 
574  total_alignments += n_alignments;
575  log_verbose(stderr, "\r alignments : %u (%.1fM) %.1f%% ", n_alignments, float(total_alignments)*1.0e-6f, 100.0f * float(hit_offset + hit_count)/float(n_hits));
576  log_debug_cont(stderr,"\n");
577 
578  buffer_count += n_alignments;
579 
580  //
581  // perform backtracking and compute CIGARs for the found alignments
582  //
583  while (buffer_count)
584  {
585  // if it's not the very last pass, break out of the loop as
586  // soon as we have less than BATCH_SIZE elements to process
587  if (hit_offset + hit_count < n_hits)
588  {
589  if (buffer_count < BATCH_SIZE)
590  break;
591  }
592 
593  timer.start();
594 
595  const uint32 n_backtracks = std::min(
596  buffer_count,
597  BATCH_SIZE );
598 
599  {
600  // copy the read infos to the plain output buffer
601  {
604  BATCH_SIZE*2,
605  buffer_offset,
606  buffer_offset + buffer_count,
608 
610  nvbio::cuda::check_error("ring-buffer copy kernel");
611  }
612 
613  // sort the alignments by read-id & RC flag to gain coherence
614  thrust::copy(
615  thrust::make_counting_iterator(0u),
616  thrust::make_counting_iterator(0u) + n_backtracks,
617  idx_queue_dvec.begin() );
618 
619  thrust::sort_by_key(
620  output_read_info_dvec.begin(),
621  output_read_info_dvec.begin() + n_backtracks,
622  idx_queue_dvec.begin() );
623  }
624 
625  // initialize cigars & MDS pools
626  cigar.clear();
627  mds.clear();
628 
629  log_debug(stderr, " traceback (%u alignments)\n", n_backtracks);
631  n_backtracks,
632  idx_queue_dptr, // input indices
633  buffer_offset, // alignment ring-buffer offset
634  BATCH_SIZE*2, // alignment ring-buffer size
635  output_alignments_dptr, // output alignments
636  band_len, // band length
637  pipeline, // pipeline state
638  params ); // global params
639 
641  nvbio::cuda::check_error("backtracking kernel");
642 
643  if (cigar.has_overflown())
644  throw nvbio::runtime_error("CIGAR vector overflow\n");
645 
646  log_debug(stderr, " finish\n");
648  n_backtracks,
649  idx_queue_dptr, // input indices
650  buffer_offset, // alignment ring-buffer offset
651  BATCH_SIZE*2, // alignment ring-buffer size
652  output_alignments_dptr, // output alignments
653  band_len, // band length
654  pipeline, // pipeline state
655  input_scoring_scheme.sw, // always use Smith-Waterman to compute the final scores
656  params ); // global params
657 
659  nvbio::cuda::check_error("alignment kernel");
660 
661  if (mds.has_overflown())
662  throw nvbio::runtime_error("MDS vector overflow\n");
663 
664  timer.stop();
665  stats.backtrack.add( n_backtracks, timer.seconds() );
666 
667  // wrap the results in a DeviceOutputBatchSE and process it
668  log_debug(stderr, " output\n");
669  {
670  io::DeviceOutputBatchSE gpu_batch(
671  n_backtracks,
674  mds,
675  mapq_dvec,
677 
678  cpu_batch.readback( gpu_batch );
679 
680  output_file->process( cpu_batch );
681  }
682 
683  buffer_count -= n_backtracks;
684  buffer_offset = (buffer_offset + n_backtracks) % (BATCH_SIZE*2);
685  }
686  }
687  log_verbose_nl(stderr);
688 
689  log_debug(stderr, " alignments: %llu\n", total_alignments);
690 }
691 
692 } // namespace cuda
693 } // namespace bowtie2
694 } // namespace nvbio