NVBIO
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
work_queue_ordered_inl.h
Go to the documentation of this file.
1 /*
2  * nvbio
3  * Copyright (c) 2011-2014, NVIDIA CORPORATION. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of the NVIDIA CORPORATION nor the
13  * names of its contributors may be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #pragma once
29 
30 #include <nvbio/basic/types.h>
31 #include <nvbio/basic/numbers.h>
32 #include <nvbio/basic/cuda/arch.h>
33 #include <cub/cub.cuh>
34 #include <thrust/copy.h>
35 
36 namespace nvbio {
37 namespace cuda {
38 
41 
42 namespace wq {
43 
46 
47 template <
49  typename WorkUnitT,
50  typename WorkStreamT,
51  typename WorkMoverT>
52 __global__
54  const uint32 n_tile_grids,
56  const WorkStreamT stream,
57  const WorkMoverT mover)
58 {
59  //
60  // This kernel consumes a stream of input tasks, which can either stop or produce one
61  // more task, i.e. a continuation.
62  // At each iteration, in order to gather SIMT utilization while mantaining as much data
63  // coherence as possible, the continuations are compacted in a queue keeping input ordering
64  // for the next round of processing.
65  // This means that we need to essentially write an intra-kernel scan, or prefix sum, on
66  // the number of continuations produced at each round, in order to locate the output slot
67  // of each continuation.
68  //
69  typedef WorkUnitT WorkUnit;
70 
71  typedef cub::BlockReduce<uint32,BLOCKDIM> BlockReduce;
72  typedef cub::BlockScan<uint32,BLOCKDIM> BlockBitScan;
73 
74  const uint32 grid_threads = gridDim.x * BLOCKDIM;
75  const uint32 queue_capacity = grid_threads * n_tile_grids;
76  const uint32 thread_id = threadIdx.x + blockIdx.x*BLOCKDIM;
77 
78  uint32 work_queue_id0 = 0u;
79  WorkUnit* work_queue0 = context.m_work_queue;
80  WorkUnit* work_queue1 = context.m_work_queue + queue_capacity;
81  volatile uint32* work_queue_size0 = context.m_work_queue_size;
82  volatile uint32* work_queue_size1 = context.m_work_queue_size + 1;
83 
84  __shared__ typename BlockBitScan::TempStorage scan_smem_storage;
85  __shared__ typename BlockReduce::TempStorage reduce_smem_storage;
86 
87  const uint32 COND_THRESHOLD = 10000000;
88  enum TileState {
89  PARTIAL_READY = 1,
90  PREFIX_READY = 2,
91  };
92 
93  condition_set_view conditions = context.m_conditions;
94 
95  #define SINGLE_CTA_SCAN
96  #if !defined(SINGLE_CTA_SCAN)
97  __shared__ volatile uint32 previous_done;
98  #else
99  uint8* continuations = context.m_continuations;
100  #endif
101 
102  //#define QUEUE_GATHER
103  #if defined(QUEUE_GATHER)
104  uint32* source_ids = context.m_source_ids;
105  #endif
106 
107  volatile uint32* partials = context.m_partials;
108  volatile uint32* prefixes = context.m_prefixes;
109 
110  const bool is_thread0 = (threadIdx.x == 0);
111 
112  uint32 stream_begin = 0;
113  const uint32 stream_end = stream.size();
114 
115  uint32 iteration = 0;
116 
117  while (1)
118  {
119  // make sure all threads see the same queue pointers
120  context.m_syncblocks.enact();
121 
122  uint32 in_work_queue_size = *work_queue_size0;
123 
124  // try to load more work to do
125  for (uint32 i = 0; i < n_tile_grids; ++i)
126  {
127  const uint32 work_begin = grid_threads * i;
128  const uint32 work_id = thread_id + work_begin;
129 
130  // if the work queue is not full, and there's remaining work in the stream, fetch some more
131  if ((work_begin <= in_work_queue_size) &&
132  (work_begin + grid_threads > in_work_queue_size) &&
133  (stream_begin < stream_end))
134  {
135  const uint32 n_loaded = nvbio::min( stream_end - stream_begin, grid_threads - (in_work_queue_size - work_begin) );
136 
137  // fetch a new work unit
138  if ((work_id >= in_work_queue_size) &&
139  (work_id - in_work_queue_size < n_loaded))
140  stream.get( stream_begin + work_id - in_work_queue_size, work_queue0 + work_id, make_uint2( work_id, work_queue_id0 ) );
141 
142  in_work_queue_size += n_loaded;
143  stream_begin += n_loaded;
144  }
145  }
146 
147  // bail-out if there's no more work
148  if (in_work_queue_size == 0)
149  return;
150 
151  // check whether all threads are active
152  NVBIO_CUDA_DEBUG_ASSERT( (__syncthreads_and(true) == true), "error: not all threads are active at block: %u\n", blockIdx.x );
153 
154  // compute how many tiles are needed to cover the input queue
155  const uint32 n_active_tile_grids = (in_work_queue_size + grid_threads-1) / grid_threads;
156  const uint32 n_active_tiles = n_active_tile_grids * gridDim.x;
157 
158  #if defined(SINGLE_CTA_SCAN)
159  //
160  // This is essentially a scan skeleton that uses a single CTA (blockIdx.x = 0) to scan the partials
161  // from other CTAs while they arrive.
162  //
163 
164  // loop across the active tiles
165  for (uint32 i = 0; i < n_active_tile_grids; ++i)
166  {
167  // make sure all threads get here at the same time
168  __syncthreads();
169 
170  // compute the tile number
171  const uint32 tile_idx = blockIdx.x + gridDim.x * i;
172  const uint32 work_id = thread_id + grid_threads * i;
173 
174  bool has_continuation = false;
175 
176  // execute this thread's work unit
177  if (work_id < in_work_queue_size)
178  has_continuation = work_queue0[ work_id ].run( stream );
179 
180  continuations[ work_id ] = has_continuation ? 1u : 0u;
181 
182  // scan the number of continuations in this block and compute their aggregate count
183  const uint32 block_aggregate = BlockReduce( reduce_smem_storage ).Sum( has_continuation ? 1 : 0 );
184 
185  // write out the partial aggregate for this tile
186  if (is_thread0)
187  {
188  partials[tile_idx] = block_aggregate;
189 
190  // set the partial ready flag
191  conditions[tile_idx].set( iteration*2 + PARTIAL_READY );
192  }
193  }
194 
195  // scan the partials
196  if (blockIdx.x == 0)
197  {
198  // make sure all threads get here at the same time
199  __syncthreads();
200 
201  uint32 carry = 0;
202 
203  // we have to scan a grid's worth of block partials
204  for (uint32 tile_begin = 0; tile_begin < n_active_tiles; tile_begin += BLOCKDIM)
205  {
206  const uint32 tile_x = tile_begin + threadIdx.x;
207  const bool tile_valid = (tile_x < n_active_tiles);
208 
209  // check whether this tile is ready
210  if (tile_valid)
211  {
212  if (!conditions[tile_x].wait( iteration*2 + PARTIAL_READY, COND_THRESHOLD ))
213  printf("PARTIAL_READY test failed for tile %u/%u at iteration %u\n", tile_x, n_active_tiles, iteration);
214  }
215 
216  const uint32 partial = tile_valid ? partials[tile_x] : 0u;
217 
218  uint32 partial_scan;
219  uint32 partial_aggregate;
220 
221  // scan the partials
222  BlockBitScan( scan_smem_storage ).ExclusiveSum( partial, partial_scan, partial_aggregate );
223 
224  // write the prefixes out
225  if (tile_valid)
226  {
227  prefixes[tile_x] = carry + partial_scan;
228  conditions[tile_x].set( iteration*2 + PREFIX_READY );
229  }
230 
231  carry += partial_aggregate;
232  }
233 
234  if (is_thread0)
235  *work_queue_size1 = carry;
236 
237  // make sure all threads get here at the same time
238  __syncthreads();
239  }
240 
241  for (uint32 i = 0; i < n_active_tile_grids; ++i)
242  {
243  // make sure all threads get here at the same time
244  __syncthreads();
245 
246  // compute the tile number
247  const uint32 tile_idx = blockIdx.x + gridDim.x * i;
248  const uint32 work_id = thread_id + grid_threads * i;
249 
250  // check whether this tile is ready
251  if (!conditions[tile_idx].wait( iteration*2 + PREFIX_READY, COND_THRESHOLD ))
252  printf("PREFIX_READY test failed for tile %u/%u at iteration %u\n", tile_idx, n_active_tiles, iteration);
253 
254  const uint32 prefix = prefixes[tile_idx];
255 
256  const uint32 has_continuation = continuations[ work_id ];
257 
258  uint32 block_scan;
259  uint32 block_aggregate;
260 
261  // scan the number of continuations in this block and compute their aggregate count
262  BlockBitScan( scan_smem_storage ).ExclusiveSum( has_continuation, block_scan, block_aggregate );
263 
264  // at this point this CTA knows where to output its continuations in the output queue
265  if (has_continuation)
266  {
267  #if defined(QUEUE_GATHER)
268  source_ids[ prefix + block_scan ] = work_id;
269  #else
270  // copy this work unit to its destination
271  mover.move(
272  stream,
273  make_uint2( work_id, work_queue_id0 ), &work_queue0[ work_id ],
274  make_uint2( prefix + block_scan, work_queue_id0 ? 0u : 1u ), &work_queue1[ prefix + block_scan ] );
275  #endif
276  }
277  }
278  #else // !SINGLE_CTA_SCAN
279  //
280  // This is essentially a scan skeleton that performs CTA chaining using an adaptive lookback
281  // strategy: the idea is that each CTA writes out its partial, and then checks for the availability
282  // of the prefix of its predecessor (without blocking); if not available, it starts using all its
283  // threads to look for a ready predecessor prefix, and scan the intermediate partials.
284  //
285 
286  for (uint32 i = 0; i < n_tile_grids; ++i)
287  {
288  __syncthreads();
289 
290  // compute the tile number
291  const uint32 tile_idx = blockIdx.x + gridDim.x * i;
292 
293  const uint32 work_id = thread_id + grid_threads * i;
294 
295  bool has_continuation = false;
296 
297  // execute this thread's work unit
298  if (work_id < in_work_queue_size)
299  has_continuation = work_queue0[ work_id ].run( stream );
300 
301  uint32 block_scan;
302  uint32 block_aggregate;
303 
304  // scan the number of continuations in this block and compute their aggregate count
305  BlockBitScan( scan_smem_storage ).ExclusiveSum( has_continuation ? 1 : 0, block_scan, block_aggregate );
306 
307  // write out the partial aggregate for this tile
308  if (is_thread0)
309  partials[tile_idx] = block_aggregate;
310 
311  // quickly check if the prefix for the previous tile is ready
312  if (is_thread0 && tile_idx)
313  previous_done = conditions[tile_idx-1].test( iteration*2 + PREFIX_READY );
314 
315  // make sure all threads see 'previous_done'
316  __syncthreads();
317 
318  int32 prefix = 0;
319 
320  if (tile_idx == 0)
321  {
322  // set the value for the first tile
323  if (is_thread0)
324  prefixes[0] = block_aggregate;
325  }
326  #if 0 // simple chaining
327  else if (tile_idx)
328  {
329  if (is_thread0)
330  {
331  if (!conditions[tile_idx-1].wait( iteration*2 + PREFIX_READY, COND_THRESHOLD ))
332  printf("PREFIX_READY test failed for tile %u/%u at iteration %u\n", tile_idx-1, n_active_tiles, iteration);
333 
334  prefix = prefixes[tile_idx-1];
335 
336  // sum to previous prefix
337  if (is_thread0)
338  prefixes[tile_idx] = prefix + block_aggregate;
339  }
340  }
341  #else // adaptive lookback
342  else if (previous_done)
343  {
344  prefix = prefixes[tile_idx-1];
345 
346  // sum to previous prefix
347  if (is_thread0)
348  prefixes[tile_idx] = prefix + block_aggregate;
349  }
350  else
351  {
352  // release the condition variable for the partial
353  if (is_thread0)
354  conditions[tile_idx].set( iteration*2 + PARTIAL_READY );
355 
356  int32 last_tile = tile_idx;
357  int32 prefix_tile = tile_idx;
358 
359  // keep looking back until we find a 'ready' prefix
360  do
361  {
362  //
363  // lookback up to BLOCKDIM predecessors in parallel, check if any
364  // of them is done (i.e. their prefix is ready), and otherwise
365  // wait on their partial to arrive.
366  //
367 
368  previous_done = 0;
369  __syncthreads();
370 
371  // compute the first tile in this batch
372  prefix_tile = nvbio::max( int32( prefix_tile - blockDim.x ), 0 );
373 
374  // check if the any of the predecessors in this block is done
375  if (prefix_tile + threadIdx.x < last_tile)
376  {
377  if (conditions[ prefix_tile + threadIdx.x ].test( iteration*2 + PREFIX_READY ))
378  previous_done = prefix_tile + threadIdx.x;
379  }
380 
381  __syncthreads();
382 
383  // let all threads update the prefix tile
384  if (previous_done)
385  prefix_tile = previous_done;
386 
387  int32 partial = 0;
388 
389  // lookback the predecessors in parallel
390  if (prefix_tile + threadIdx.x < last_tile)
391  {
392  if (previous_done && threadIdx.x == 0)
393  {
394  // let thread0 read the ready prefix
395  partial = prefixes[ prefix_tile ];
396  }
397  else
398  {
399  // wait on the partials
400  if (!conditions[ prefix_tile + threadIdx.x ].wait( iteration*2 + PARTIAL_READY, COND_THRESHOLD ))
401  printf("PARTIAL_READY test failed for tile %u at tile %u/%u at iteration %u\n", prefix_tile + threadIdx.x, tile_idx-1, n_active_tiles, iteration);
402 
403  partial = partials[ prefix_tile + threadIdx.x ];
404  }
405  }
406 
407  // reduce the prefixes
408  prefix += BlockReduce( reduce_smem_storage ).Sum( partial );
409 
410  last_tile = prefix_tile;
411  }
412  while (prefix_tile && !previous_done);
413 
414  if (is_thread0)
415  {
416  // write out the final values
417  prefixes[tile_idx] = prefix + block_aggregate;
418  }
419  }
420  #endif
421  // share the prefix with the rest of the CTA
422  __syncthreads();
423  if (is_thread0)
424  previous_done = prefix;
425  __syncthreads();
426  prefix = previous_done;
427 
428  // write the output queue size
429  if (tile_idx == n_active_tiles-1 && is_thread0)
430  *work_queue_size1 = prefix + block_aggregate;
431 
432  // at this point this CTA knows where to output its continuations in the output queue
433  if (has_continuation)
434  {
435  #if defined(QUEUE_GATHER)
436  source_ids[ prefix + block_scan ] = work_id;
437  #else
438  // copy this work unit to its destination
439  mover.move(
440  stream,
441  make_uint2( work_id, work_queue_id0 ), &work_queue0[ work_id ],
442  make_uint2( prefix + block_scan, work_queue_id0 ? 0u : 1u ), &work_queue1[ prefix + block_scan ] );
443  #endif
444  }
445 
446  // release the condition for the scanned value for this tile
447  if (is_thread0)
448  conditions[tile_idx].set( iteration*2 + PREFIX_READY );
449  }
450 
451  __syncthreads();
452 
453  // block all CTAs until the last tile has been processed
454  if (!conditions[n_active_tiles-1].wait( iteration*2 + PREFIX_READY, COND_THRESHOLD ))
455  printf("PREFIX_READY test failed for last tile (%u) at iteration %u\n", n_active_tiles-1, iteration);
456  #endif // !SINGLE_CTA_SCAN
457 
458  // we can now swap the queue pointers
459  {
460  WorkUnit* tmp = work_queue0;
461  work_queue0 = work_queue1;
462  work_queue1 = tmp;
463  }
464  {
465  volatile uint32* tmp = work_queue_size0;
466  work_queue_size0 = work_queue_size1;
467  work_queue_size1 = tmp;
468  }
469  work_queue_id0 = work_queue_id0 ? 0u : 1u;
470 
471  ++iteration;
472 
473  // gather the compacted data
474  #if defined(QUEUE_GATHER)
475  {
476  // make sure all threads see the same queue pointers
477  context.m_syncblocks.enact();
478 
479  // compute how many tiles are needed to cover the input queue
480  const uint32 out_grid_size = *work_queue_size0;
481  const uint32 n_active_tile_grids = (out_grid_size + grid_threads-1) / grid_threads;
482 
483  for (uint32 i = 0; i < n_active_tile_grids; ++i)
484  {
485  // compute the tile number
486  const uint32 work_id = thread_id + grid_threads * i;
487 
488  if (work_id < out_grid_size)
489  {
490  // fetch the source id
491  const uint32 src_id = source_ids[ work_id ];
492 
493  mover.move(
494  stream,
495  make_uint2( src_id, work_queue_id0 ? 0u : 1u ), &work_queue1[ src_id ],
496  make_uint2( work_id, work_queue_id0 ? 1u : 0u ), &work_queue0[ work_id ] );
497  }
498  }
499  }
500  #endif // QUEUE_GATHER
501  }
502 }
503 
505 
506 } // namespace wq
507 
508 // consume a stream of work units
509 //
510 template <
511  typename WorkUnitT,
513 template <typename WorkStream, typename WorkMover>
514 void WorkQueue<OrderedQueueTag,WorkUnitT,BLOCKDIM>::consume(const WorkStream stream, const WorkMover mover)
515 {
516  //const uint32 stream_size = stream.size();
517 
518  // compute the number of blocks we are going to launch
519  const uint32 n_blocks = (uint32)cuda::max_active_blocks( wq::work_queue_kernel<BLOCKDIM,WorkUnit,WorkStream,WorkMover>, BLOCKDIM, 0u );
520  const uint32 grid_threads = n_blocks * BLOCKDIM;
521 
522  const uint32 n_tile_grids = m_capacity / grid_threads;
523  m_condition_set.resize( n_blocks*n_tile_grids );
524  m_partials.resize( n_blocks*n_tile_grids );
525  m_prefixes.resize( n_blocks*n_tile_grids );
526  m_continuations.resize( grid_threads*n_tile_grids );
527  m_source_ids.resize( grid_threads*n_tile_grids );
528  m_work_queue.resize( grid_threads*n_tile_grids * 2 ); // alloc space for an input and an output queue
529  m_work_queue_size.resize( 2 ); // alloc space for an input and an output queue
530  m_syncblocks.clear();
531 
532  m_work_queue_size[0] = 0;
533 
534  // launch the consuming kernel
535  wq::work_queue_kernel<BLOCKDIM,WorkUnit,WorkStream,WorkMover> <<<n_blocks,BLOCKDIM>>>( n_tile_grids, get_context(), stream, mover );
536 }
537 
539 
540 } // namespace cuda
541 } // namespace nvbio