NVBIO
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
pipeline_inl.h
Go to the documentation of this file.
1 /*
2  * nvbio
3  * Copyright (c) 2011-2014, NVIDIA CORPORATION. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of the NVIDIA CORPORATION nor the
13  * names of its contributors may be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 // pipeline_inl.h
29 //
30 
31 #pragma once
32 
33 namespace nvbio {
34 
35 namespace priv {
36 
37 struct PipelineThreadBase : public Thread<PipelineThreadBase>
38 {
42 
45  virtual ~PipelineThreadBase() {}
46 
49  virtual void run() {}
50 
55  virtual void* fetch(const uint32 i) { return NULL; }
56 
59  virtual void release(const uint32 i) {}
60 
63  void add_dependency(PipelineThreadBase* dep) { m_deps.push_back( dep ); }
64 
67  void add_client() { ++m_clients; }
68 
71  void set_id(const uint32 id) { m_id = id; }
72 
73  std::vector<PipelineThreadBase*> m_deps;
76 };
77 
93 template <typename SinkType>
95 {
96  typedef typename SinkType::argument_type argument_type;
97 
100  PipelineSinkThread(SinkType* stage) : m_stage( stage ), m_counter(0)
101  {}
102 
105  void run()
106  {
107  while (fill()) { /*yield();*/ }
108  }
109 
112  bool fill()
113  {
114  // fetch the inputs from all sources
115  PipelineContext context;
116  for (uint32 i = 0; i < (uint32)m_deps.size(); ++i)
117  {
118  context.in[i] = m_deps[i]->fetch( m_counter );
119 
120  if (context.in[i] == NULL)
121  {
122  // release all inputs
123  for (uint32 j = 0; j < i; ++j)
124  m_deps[j]->release( m_counter );
125 
126  // signal completion
127  return false;
128  }
129  }
130 
131  // process
132  bool ret = false;
133  {
134  ScopedTimer<float> timer( &m_time );
135 
136  // execute this stage
137  ret = m_stage->process( context );
138  }
139 
140  // release all inputs
141  for (uint32 i = 0; i < (uint32)m_deps.size(); ++i)
142  m_deps[i]->release( m_counter );
143 
144  // advance the counter
145  m_counter++;
146 
147  return ret;
148  }
149 
150  SinkType* m_stage;
152  float m_time;
153 };
154 
171 template <typename StageType>
173 {
174  static const uint32 EMPTY_SLOT = uint32(-1);
175 
176  typedef typename StageType::argument_type argument_type;
177  typedef typename StageType::return_type return_type;
178 
181  PipelineStageThread(StageType* stage, const uint32 buffers) : m_stage( stage ), m_buffers( buffers )
182  {
183  m_data.resize( m_buffers );
184 
185  for (uint32 i = 0; i < m_buffers; ++i)
186  {
188  m_data_id[i] = EMPTY_SLOT;
189  }
190 
191  m_counter = 0;
192  }
193 
196  void run()
197  {
198  while (fill()) { /*yield();*/ }
199  }
200 
203  bool fill()
204  {
205  const uint32 slot = m_counter & (m_buffers-1);
206 
207  log_debug(stderr, " [%u] polling for writing [%u:%u]... started\n", m_id, m_counter, slot);
208  // poll until the set is done reading & ready to be reused
209  while (m_data_id[ slot ] != EMPTY_SLOT)
210  {
211  yield();
212  }
213  log_debug(stderr, " [%u] polling for writing [%u:%u]... done\n", m_id, m_counter, slot);
214 
215  PipelineContext context;
216 
217  // set the output
218  context.out = &m_data[ slot ];
219 
220  // fetch the inputs from all sources
221  for (uint32 i = 0; i < (uint32)m_deps.size(); ++i)
222  {
223  context.in[i] = m_deps[i]->fetch( m_counter );
224 
225  if (context.in[i] == NULL)
226  {
227  // release all inputs
228  for (uint32 j = 0; j < i; ++j)
229  m_deps[j]->release( m_counter );
230 
231  // mark this as an invalid entry & return
232  m_data_ptr[ slot ] = NULL;
233 
234  // make sure the other threads see this before the id is set
236 
237  m_data_id[ slot ] = m_counter;
238  return false;
239  }
240  }
241 
242  bool ret = false;
243  {
244  ScopedTimer<float> timer( &m_time );
245 
246  // execute this stage
247  ret = m_stage->process( context );
248  }
249 
250  // release all inputs
251  for (uint32 i = 0; i < (uint32)m_deps.size(); ++i)
252  m_deps[i]->release( m_counter );
253 
254  if (ret)
255  {
256  // set the reference counter
257  m_count[ slot ] = m_clients-1u;
258 
259  // mark the set as done
260  m_data_ptr[ slot ] = &m_data[ slot ];
261 
262  // make sure the other threads see the reference count before the output is set
264 
265  // mark the set as done
266  m_data_id[ slot ] = m_counter;
267  }
268  else
269  {
270  // mark this as an invalid entry
271  m_data_ptr[ slot ] = NULL;
272 
273  // make sure the other threads see this before the id is set
275 
276  m_data_id[ slot ] = m_counter;
277  return false;
278  }
279 
280  // switch to the next set
281  ++m_counter;
282  return true;
283  }
284 
291  void* fetch(const uint32 i)
292  {
293  const uint32 slot = i & (m_buffers-1);
294 
295  log_debug(stderr, " [%u] polling for reading [%u:%u]... started\n", m_id, i, slot);
296  // poll until the set is ready to be consumed
297  while (m_data_id[ slot ] != i)
298  {
299  yield();
300  }
301 
302  // make sure the other writes are seen
304 
305  log_debug(stderr, " [%u] polling for reading [%u:%u]... done\n", m_id, i, slot);
306 
307  return (void*)m_data_ptr[ slot ];
308  }
309 
312  void release(const uint32 i)
313  {
314  const uint32 slot = i & (m_buffers-1);
315 
316  const uint32 ref = atomic_sub( (uint32*)m_count + slot, 1u );
317  if (ref == 0)
318  {
319  log_debug(stderr, " [%u] release [%u:%u]\n", m_id, i, slot);
320  // mark this set as free / ready to be written
321  m_data_ptr[ slot ] = (return_type*)EMPTY_SLOT;
322  m_data_id[ slot ] = EMPTY_SLOT;
323 
324  // make sure the other threads see this change
326  }
327  }
328 
329  StageType* m_stage;
331  std::vector<return_type> m_data;
332  return_type* volatile m_data_ptr[64];
333  uint32 volatile m_data_id[64];
334  uint32 volatile m_count[64];
335  volatile uint32 m_counter;
336  float m_time;
337 };
338 
339 } // namespace priv
340 
341 // run the pipeline to completion
342 //
344 {
345  // start all threads
346  for (size_t i = 0; i < m_stages.size(); ++i)
347  delete m_stages[i];
348 }
349 
350 // append a new pipeline stage
351 //
352 template <typename StageType>
353 uint32 Pipeline::append_stage(StageType* stage, const uint32 buffers)
354 {
355  // create a new stage-thread
357 
358  // append it
359  m_stages.push_back( thread );
360 
361  const uint32 id = (uint32)m_stages.size()-1;
362  thread->set_id( id );
363  return id;
364 }
365 
366 // append the pipeline sink
367 //
368 template <typename SinkType>
370 {
371  // create a new stage-thread
373 
374  // append it
375  m_stages.push_back( thread );
376 
377  const uint32 id = (uint32)m_stages.size()-1;
378  thread->set_id( id );
379  return id;
380 }
381 
382 // add a dependency
383 //
384 inline void Pipeline::add_dependency(const uint32 in, const uint32 out)
385 {
386  m_stages[out]->add_dependency( m_stages[in] );
387  m_stages[in]->add_client();
388 }
389 
390 // run the pipeline to completion
391 //
392 inline void Pipeline::run()
393 {
394  // start all threads
395  for (size_t i = 0; i < m_stages.size(); ++i)
396  m_stages[i]->create();
397 
398  // and join them
399  for (size_t i = 0; i < m_stages.size(); ++i)
400  m_stages[i]->join();
401 }
402 
403 } // namespace nvbio