NVBIO
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
input_thread.cpp
Go to the documentation of this file.
1 /*
2  * nvbio
3  * Copyright (c) 2011-2014, NVIDIA CORPORATION. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of the NVIDIA CORPORATION nor the
13  * names of its contributors may be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
33 #include <nvbio/basic/threads.h>
34 #include <nvbio/basic/atomics.h>
35 #include <nvbio/basic/timer.h>
36 #include <nvbio/basic/exceptions.h>
37 
38 namespace nvbio {
39 namespace bowtie2 {
40 namespace cuda {
41 
43 {
44  log_verbose( stderr, "starting background input thread\n" );
45 
46  try
47  {
48  // fill up the free pool
49  {
50  ScopedLock lock( &m_free_pool_lock );
51  for (uint32 i = 0; i < BUFFERS; ++i)
52  m_free_pool.push( &m_read_data_storage[i] );
53  }
54 
55  while (1u)
56  {
57  io::SequenceDataHost* read_data = NULL;
58 
59  // loop until the free pool gets filled
60  while (read_data == NULL)
61  {
62  ScopedLock lock( &m_free_pool_lock );
63 
64  if (m_free_pool.empty() == false)
65  {
66  read_data = m_free_pool.top();
67  m_free_pool.pop();
68  }
69 
70  yield();
71  }
72 
73  log_debug( stderr, " reading input batch %u\n", m_set );
74 
75  Timer timer;
76  timer.start();
77 
78  const int ret = io::next( DNA_N, read_data, m_read_data_stream, m_batch_size, m_batch_size*m_read_length );
79 
80  timer.stop();
81 
82  if (ret)
83  {
84  ScopedLock lock( &m_ready_pool_lock );
85  m_ready_pool.push_front( read_data );
86  m_ready_poolN.push_front( m_reads );
87 
88  m_reads += read_data->size();
89 
90  m_stats.read_io.add( read_data->size(), timer.seconds() );
91  }
92  else
93  {
94  // stop the thread
95  m_done = true;
97  break;
98  }
99 
100  // switch to the next set
101  ++m_set;
102  }
103  }
104  catch (nvbio::bad_alloc e)
105  {
106  log_error(stderr, "caught a nvbio::bad_alloc exception:\n");
107  log_error(stderr, " %s\n", e.what());
108  exit(1);
109  }
110  catch (nvbio::logic_error e)
111  {
112  log_error(stderr, "caught a nvbio::logic_error exception:\n");
113  log_error(stderr, " %s\n", e.what());
114  exit(1);
115  }
116  catch (nvbio::runtime_error e)
117  {
118  log_error(stderr, "caught a nvbio::runtime_error exception:\n");
119  log_error(stderr, " %s\n", e.what());
120  exit(1);
121  }
122  catch (std::bad_alloc e)
123  {
124  log_error(stderr, "caught a std::bad_alloc exception:\n");
125  log_error(stderr, " %s\n", e.what());
126  exit(1);
127  }
128  catch (std::logic_error e)
129  {
130  log_error(stderr, "caught a std::logic_error exception:\n");
131  log_error(stderr, " %s\n", e.what());
132  exit(1);
133  }
134  catch (std::runtime_error e)
135  {
136  log_error(stderr, "caught a std::runtime_error exception:\n");
137  log_error(stderr, " %s\n", e.what());
138  exit(1);
139  }
140  catch (...)
141  {
142  log_error(stderr, "caught an unknown exception!\n");
143  exit(1);
144  }
145 }
146 
147 // get a batch
148 //
150 {
151  // loop until the ready pool gets filled
152  while (1)
153  {
154  ScopedLock lock( &m_ready_pool_lock );
155 
156  if (m_ready_pool.empty() == false)
157  {
158  // pop from the ready pool
159  io::SequenceDataHost* read_data = m_ready_pool.back();
160  m_ready_pool.pop_back();
161  if (offset) *offset = m_ready_poolN.back();
162  m_ready_poolN.pop_back();
163  return read_data;
164  }
165  else if (m_done)
166  {
167  if (offset) *offset = m_reads;
168  return NULL;
169  }
170 
171  yield();
172  }
173 }
174 
175 // release a batch
176 //
178 {
179  // push back to the free pool
180  ScopedLock lock( &m_free_pool_lock );
181  m_free_pool.push( read_data );
182 }
183 
185 {
186  log_verbose( stderr, "starting background paired-end input thread\n" );
187 
188  try
189  {
190  // fill up the free pool
191  {
192  ScopedLock lock( &m_free_pool_lock );
193  for (uint32 i = 0; i < BUFFERS; ++i)
194  {
195  m_free_pool1.push( &m_read_data_storage1[i] );
196  m_free_pool2.push( &m_read_data_storage2[i] );
197  }
198  }
199 
200  while (1u)
201  {
202  io::SequenceDataHost* read_data1 = NULL;
203  io::SequenceDataHost* read_data2 = NULL;
204 
205  // loop until the free pool gets filled
206  while (read_data1 == NULL || read_data2 == NULL)
207  {
208  ScopedLock lock( &m_free_pool_lock );
209 
210  if (m_free_pool1.empty() == false &&
211  m_free_pool2.empty() == false)
212  {
213  read_data1 = m_free_pool1.top(); m_free_pool1.pop();
214  read_data2 = m_free_pool2.top(); m_free_pool2.pop();
215  }
216 
217  yield();
218  }
219 
220  log_debug( stderr, " reading input batch %u\n", m_set );
221 
222  Timer timer;
223  timer.start();
224 
225  const int ret1 = io::next( DNA_N, read_data1, m_read_data_stream1, m_batch_size, m_batch_size*m_read_length );
226  const int ret2 = io::next( DNA_N, read_data2, m_read_data_stream2, read_data1->size() );
227 
228  timer.stop();
229 
230  if (ret1 && ret2)
231  {
232  ScopedLock lock( &m_ready_pool_lock );
233  m_ready_pool1.push_front( read_data1 );
234  m_ready_pool2.push_front( read_data2 );
235  m_ready_poolN.push_front( m_reads );
236 
237  m_reads += read_data1->size();
238 
239  m_stats.read_io.add( read_data1->size(), timer.seconds() );
240  }
241  else
242  {
243  // stop the thread
244  m_done = true;
246  break;
247  }
248 
249  // switch to the next set
250  ++m_set;
251  }
252  }
253  catch (nvbio::bad_alloc e)
254  {
255  log_error(stderr, "caught a nvbio::bad_alloc exception:\n");
256  log_error(stderr, " %s\n", e.what());
257  exit(1);
258  }
259  catch (nvbio::logic_error e)
260  {
261  log_error(stderr, "caught a nvbio::logic_error exception:\n");
262  log_error(stderr, " %s\n", e.what());
263  exit(1);
264  }
265  catch (nvbio::runtime_error e)
266  {
267  log_error(stderr, "caught a nvbio::runtime_error exception:\n");
268  log_error(stderr, " %s\n", e.what());
269  exit(1);
270  }
271  catch (std::bad_alloc e)
272  {
273  log_error(stderr, "caught a std::bad_alloc exception:\n");
274  log_error(stderr, " %s\n", e.what());
275  exit(1);
276  }
277  catch (std::logic_error e)
278  {
279  log_error(stderr, "caught a std::logic_error exception:\n");
280  log_error(stderr, " %s\n", e.what());
281  exit(1);
282  }
283  catch (std::runtime_error e)
284  {
285  log_error(stderr, "caught a std::runtime_error exception:\n");
286  log_error(stderr, " %s\n", e.what());
287  exit(1);
288  }
289  catch (...)
290  {
291  log_error(stderr, "caught an unknown exception!\n");
292  exit(1);
293  }
294 }
295 
296 // get a batch
297 //
298 std::pair<io::SequenceDataHost*,io::SequenceDataHost*> InputThreadPE::next(uint32* offset)
299 {
300  // loop until the ready pool gets filled
301  while (1)
302  {
303  ScopedLock lock( &m_ready_pool_lock );
304 
305  if (m_ready_pool1.empty() == false &&
306  m_ready_pool2.empty() == false)
307  {
308  // pop from the ready pool
309  std::pair<io::SequenceDataHost*,io::SequenceDataHost*> read_data;
310  read_data.first = m_ready_pool1.back(); m_ready_pool1.pop_back();
311  read_data.second = m_ready_pool2.back(); m_ready_pool2.pop_back();
312  if (offset) *offset = m_ready_poolN.back();
313  m_ready_poolN.pop_back();
314  return read_data;
315  }
316  else if (m_done)
317  {
318  if (offset) *offset = m_reads;
319  return std::pair<io::SequenceDataHost*,io::SequenceDataHost*>( NULL, NULL );
320  }
321 
322  yield();
323 
324  }
325 }
326 
327 // release a batch
328 //
329 void InputThreadPE::release(std::pair<io::SequenceDataHost*,io::SequenceDataHost*> read_data)
330 {
331  // push back to the free pool
332  ScopedLock lock( &m_free_pool_lock );
333  m_free_pool1.push( read_data.first );
334  m_free_pool2.push( read_data.second );
335 }
336 
337 } // namespace cuda
338 } // namespace bowtie2
339 } // namespace nvbio