NVBIO
Main Page
Modules
Classes
Examples
File List
File Members
All
Classes
Namespaces
Files
Functions
Variables
Typedefs
Enumerations
Enumerator
Friends
Macros
Groups
Pages
nvbio
basic
pipeline_inl.h
Go to the documentation of this file.
1
/*
2
* nvbio
3
* Copyright (c) 2011-2014, NVIDIA CORPORATION. All rights reserved.
4
*
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions are met:
7
* * Redistributions of source code must retain the above copyright
8
* notice, this list of conditions and the following disclaimer.
9
* * Redistributions in binary form must reproduce the above copyright
10
* notice, this list of conditions and the following disclaimer in the
11
* documentation and/or other materials provided with the distribution.
12
* * Neither the name of the NVIDIA CORPORATION nor the
13
* names of its contributors may be used to endorse or promote products
14
* derived from this software without specific prior written permission.
15
*
16
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19
* DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
20
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
*/
27
28
// pipeline_inl.h
29
//
30
31
#pragma once
32
33
namespace
nvbio {
34
35
namespace
priv {
36
37
struct
PipelineThreadBase
:
public
Thread
<PipelineThreadBase>
38
{
41
PipelineThreadBase
() :
m_clients
(0),
m_id
(0) {}
42
45
virtual
~PipelineThreadBase
() {}
46
49
virtual
void
run
() {}
50
55
virtual
void
*
fetch
(
const
uint32
i) {
return
NULL; }
56
59
virtual
void
release
(
const
uint32
i) {}
60
63
void
add_dependency
(
PipelineThreadBase
* dep) {
m_deps
.push_back( dep ); }
64
67
void
add_client
() { ++
m_clients
; }
68
71
void
set_id
(
const
uint32
id
) {
m_id
= id; }
72
73
std::vector<PipelineThreadBase*>
m_deps
;
74
uint32
m_clients
;
75
uint32
m_id
;
76
};
77
93
template
<
typename
SinkType>
94
struct
PipelineSinkThread
:
public
PipelineThreadBase
95
{
96
typedef
typename
SinkType::argument_type
argument_type
;
97
100
PipelineSinkThread
(SinkType* stage) :
m_stage
( stage ),
m_counter
(0)
101
{}
102
105
void
run
()
106
{
107
while
(
fill
()) {
/*yield();*/
}
108
}
109
112
bool
fill
()
113
{
114
// fetch the inputs from all sources
115
PipelineContext
context;
116
for
(
uint32
i = 0; i < (
uint32
)
m_deps
.size(); ++i)
117
{
118
context.
in
[i] =
m_deps
[i]->fetch(
m_counter
);
119
120
if
(context.
in
[i] == NULL)
121
{
122
// release all inputs
123
for
(
uint32
j = 0; j < i; ++j)
124
m_deps
[j]->
release
(
m_counter
);
125
126
// signal completion
127
return
false
;
128
}
129
}
130
131
// process
132
bool
ret =
false
;
133
{
134
ScopedTimer<float>
timer( &
m_time
);
135
136
// execute this stage
137
ret =
m_stage
->process( context );
138
}
139
140
// release all inputs
141
for
(
uint32
i = 0; i < (
uint32
)
m_deps
.size(); ++i)
142
m_deps
[i]->
release
(
m_counter
);
143
144
// advance the counter
145
m_counter
++;
146
147
return
ret;
148
}
149
150
SinkType*
m_stage
;
151
uint32
m_counter
;
152
float
m_time
;
153
};
154
171
template
<
typename
StageType>
172
struct
PipelineStageThread
:
public
PipelineThreadBase
173
{
174
static
const
uint32
EMPTY_SLOT
=
uint32
(-1);
175
176
typedef
typename
StageType::argument_type
argument_type
;
177
typedef
typename
StageType::return_type
return_type
;
178
181
PipelineStageThread
(StageType* stage,
const
uint32
buffers) :
m_stage
( stage ),
m_buffers
( buffers )
182
{
183
m_data
.resize(
m_buffers
);
184
185
for
(
uint32
i = 0; i <
m_buffers
; ++i)
186
{
187
m_data_ptr
[i] = (
return_type
*)
EMPTY_SLOT
;
188
m_data_id
[i] =
EMPTY_SLOT
;
189
}
190
191
m_counter
= 0;
192
}
193
196
void
run
()
197
{
198
while
(
fill
()) {
/*yield();*/
}
199
}
200
203
bool
fill
()
204
{
205
const
uint32
slot =
m_counter
& (
m_buffers
-1);
206
207
log_debug
(stderr,
" [%u] polling for writing [%u:%u]... started\n"
,
m_id
,
m_counter
, slot);
208
// poll until the set is done reading & ready to be reused
209
while
(
m_data_id
[ slot ] !=
EMPTY_SLOT
)
210
{
211
yield
();
212
}
213
log_debug
(stderr,
" [%u] polling for writing [%u:%u]... done\n"
,
m_id
,
m_counter
, slot);
214
215
PipelineContext
context;
216
217
// set the output
218
context.
out
= &
m_data
[ slot ];
219
220
// fetch the inputs from all sources
221
for
(
uint32
i = 0; i < (
uint32
)
m_deps
.size(); ++i)
222
{
223
context.
in
[i] =
m_deps
[i]->fetch(
m_counter
);
224
225
if
(context.
in
[i] == NULL)
226
{
227
// release all inputs
228
for
(
uint32
j = 0; j < i; ++j)
229
m_deps
[j]->
release
(
m_counter
);
230
231
// mark this as an invalid entry & return
232
m_data_ptr
[ slot ] = NULL;
233
234
// make sure the other threads see this before the id is set
235
host_release_fence
();
236
237
m_data_id
[ slot ] =
m_counter
;
238
return
false
;
239
}
240
}
241
242
bool
ret =
false
;
243
{
244
ScopedTimer<float>
timer( &
m_time
);
245
246
// execute this stage
247
ret =
m_stage
->process( context );
248
}
249
250
// release all inputs
251
for
(
uint32
i = 0; i < (
uint32
)
m_deps
.size(); ++i)
252
m_deps
[i]->
release
(
m_counter
);
253
254
if
(ret)
255
{
256
// set the reference counter
257
m_count
[ slot ] =
m_clients
-1u;
258
259
// mark the set as done
260
m_data_ptr
[ slot ] = &
m_data
[ slot ];
261
262
// make sure the other threads see the reference count before the output is set
263
host_release_fence
();
264
265
// mark the set as done
266
m_data_id
[ slot ] =
m_counter
;
267
}
268
else
269
{
270
// mark this as an invalid entry
271
m_data_ptr
[ slot ] = NULL;
272
273
// make sure the other threads see this before the id is set
274
host_release_fence
();
275
276
m_data_id
[ slot ] =
m_counter
;
277
return
false
;
278
}
279
280
// switch to the next set
281
++
m_counter
;
282
return
true
;
283
}
284
291
void
*
fetch
(
const
uint32
i)
292
{
293
const
uint32
slot = i & (
m_buffers
-1);
294
295
log_debug
(stderr,
" [%u] polling for reading [%u:%u]... started\n"
,
m_id
, i, slot);
296
// poll until the set is ready to be consumed
297
while
(
m_data_id
[ slot ] != i)
298
{
299
yield
();
300
}
301
302
// make sure the other writes are seen
303
host_acquire_fence
();
304
305
log_debug
(stderr,
" [%u] polling for reading [%u:%u]... done\n"
,
m_id
, i, slot);
306
307
return
(
void
*)
m_data_ptr
[ slot ];
308
}
309
312
void
release
(
const
uint32
i)
313
{
314
const
uint32
slot = i & (
m_buffers
-1);
315
316
const
uint32
ref =
atomic_sub
( (
uint32
*)
m_count
+ slot, 1u );
317
if
(ref == 0)
318
{
319
log_debug
(stderr,
" [%u] release [%u:%u]\n"
,
m_id
, i, slot);
320
// mark this set as free / ready to be written
321
m_data_ptr
[ slot ] = (
return_type
*)
EMPTY_SLOT
;
322
m_data_id
[ slot ] =
EMPTY_SLOT
;
323
324
// make sure the other threads see this change
325
host_release_fence
();
326
}
327
}
328
329
StageType*
m_stage
;
330
uint32
m_buffers
;
331
std::vector<return_type>
m_data
;
332
return_type
*
volatile
m_data_ptr
[64];
333
uint32
volatile
m_data_id
[64];
334
uint32
volatile
m_count
[64];
335
volatile
uint32
m_counter
;
336
float
m_time
;
337
};
338
339
}
// namespace priv
340
341
// run the pipeline to completion
342
//
343
inline
Pipeline::~Pipeline
()
344
{
345
// start all threads
346
for
(
size_t
i = 0; i <
m_stages
.size(); ++i)
347
delete
m_stages
[i];
348
}
349
350
// append a new pipeline stage
351
//
352
template
<
typename
StageType>
353
uint32
Pipeline::append_stage
(StageType* stage,
const
uint32
buffers)
354
{
355
// create a new stage-thread
356
priv::PipelineStageThread<StageType>
* thread =
new
priv::PipelineStageThread<StageType>
( stage, buffers );
357
358
// append it
359
m_stages
.push_back( thread );
360
361
const
uint32
id
= (
uint32
)
m_stages
.size()-1;
362
thread->
set_id
(
id
);
363
return
id;
364
}
365
366
// append the pipeline sink
367
//
368
template
<
typename
SinkType>
369
uint32
Pipeline::append_sink
(SinkType* sink)
370
{
371
// create a new stage-thread
372
priv::PipelineSinkThread<SinkType>
* thread =
new
priv::PipelineSinkThread<SinkType>
( sink );
373
374
// append it
375
m_stages
.push_back( thread );
376
377
const
uint32
id
= (
uint32
)
m_stages
.size()-1;
378
thread->
set_id
(
id
);
379
return
id;
380
}
381
382
// add a dependency
383
//
384
inline
void
Pipeline::add_dependency
(
const
uint32
in,
const
uint32
out)
385
{
386
m_stages
[out]->add_dependency(
m_stages
[in] );
387
m_stages
[in]->add_client();
388
}
389
390
// run the pipeline to completion
391
//
392
inline
void
Pipeline::run
()
393
{
394
// start all threads
395
for
(
size_t
i = 0; i <
m_stages
.size(); ++i)
396
m_stages
[i]->create();
397
398
// and join them
399
for
(
size_t
i = 0; i <
m_stages
.size(); ++i)
400
m_stages
[i]->join();
401
}
402
403
}
// namespace nvbio
Generated on Wed Feb 25 2015 08:32:57 for NVBIO by
1.8.4