NVBIO
Main Page
Modules
Classes
Examples
File List
File Members
All
Classes
Namespaces
Files
Functions
Variables
Typedefs
Enumerations
Enumerator
Friends
Macros
Groups
Pages
contrib
htslib
cram
thread_pool.h
Go to the documentation of this file.
1
/*
2
Copyright (c) 2013 Genome Research Ltd.
3
Author: James Bonfield <jkb@sanger.ac.uk>
4
5
Redistribution and use in source and binary forms, with or without
6
modification, are permitted provided that the following conditions are met:
7
8
1. Redistributions of source code must retain the above copyright notice,
9
this list of conditions and the following disclaimer.
10
11
2. Redistributions in binary form must reproduce the above copyright notice,
12
this list of conditions and the following disclaimer in the documentation
13
and/or other materials provided with the distribution.
14
15
3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
16
Institute nor the names of its contributors may be used to endorse or promote
17
products derived from this software without specific prior written permission.
18
19
THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND
20
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22
DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
23
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
*/
30
31
/*
32
* This file implements a thread pool for multi-threading applications.
33
* It consists of two distinct interfaces: thread pools an results queues.
34
*
35
* The pool of threads is given a function pointer and void* data to pass in.
36
* This means the pool can run jobs of multiple types, albeit first come
37
* first served with no job scheduling.
38
*
39
* Upon completion, the return value from the function pointer is added to
40
* a results queue. We may have multiple queues in use for the one pool.
41
*
42
* An example: reading from BAM and writing to CRAM with 10 threads. We'll
43
* have a pool of 10 threads and two results queues holding decoded BAM blocks
44
* and encoded CRAM blocks respectively.
45
*/
46
47
#ifndef _THREAD_POOL_H_
48
#define _THREAD_POOL_H_
49
50
#include <pthread.h>
51
52
struct
t_pool
;
53
struct
t_results_queue
;
54
55
typedef
struct
t_pool_job
{
56
void
*(*func)(
void
*
arg
);
57
void
*
arg
;
58
struct
t_pool_job
*
next
;
59
60
struct
t_pool
*
p
;
61
struct
t_results_queue
*
q
;
62
int
serial
;
63
}
t_pool_job
;
64
65
typedef
struct
t_res
{
66
struct
t_res
*
next
;
67
int
serial
;
// sequential number for ordering
68
void
*
data
;
// result itself
69
}
t_pool_result
;
70
71
typedef
struct
t_pool
{
72
int
qsize
;
// size of queue
73
int
njobs
;
// pending job count
74
int
nwaiting
;
// how many workers waiting for new jobs
75
int
shutdown
;
// true if pool is being destroyed
76
77
// queue of pending jobs
78
t_pool_job
*
head
, *
tail
;
79
80
// threads
81
int
tsize
;
// maximum number of jobs
82
pthread_t *
t
;
83
84
// Mutexes
85
pthread_mutex_t
pool_m
;
// used when updating head/tail
86
87
pthread_cond_t
empty_c
;
88
pthread_cond_t
pending_c
;
// not empty
89
pthread_cond_t
full_c
;
90
91
// Debugging to check wait time
92
long
long
total_time
,
wait_time
;
93
}
t_pool
;
94
95
typedef
struct
t_results_queue
{
96
t_pool_result
*
result_head
;
97
t_pool_result
*
result_tail
;
98
int
next_serial
;
99
int
curr_serial
;
100
int
queue_len
;
// number of items in queue
101
int
pending
;
// number of pending items (in progress or in pool list)
102
pthread_mutex_t
result_m
;
103
pthread_cond_t
result_avail_c
;
104
}
t_results_queue
;
105
106
107
/*
108
* Creates a worker pool of length qsize with tsize worker threads.
109
*
110
* Returns pool pointer on success;
111
* NULL on failure
112
*/
113
t_pool
*
t_pool_init
(
int
qsize,
int
tsize);
114
115
/*
116
* Adds an item to the work pool.
117
*
118
* FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
119
* result returned. Ie rather than blocking on full queue we're permitted
120
* to return early on "result available" event too.
121
* Caller would then have a while loop around t_pool_dispatch.
122
* Or, return -1 and set errno to E_AGAIN to indicate job not yet submitted.
123
*
124
* Returns 0 on success
125
* -1 on failure
126
*/
127
int
t_pool_dispatch
(
t_pool
*p,
t_results_queue
*q,
128
void
*(*func)(
void
*arg),
void
*arg);
129
int
t_pool_dispatch2
(
t_pool
*p,
t_results_queue
*q,
130
void
*(*func)(
void
*arg),
void
*arg,
int
nonblock);
131
132
/*
133
* Flushes the pool, but doesn't exit. This simply drains the queue and
134
* ensures all worker threads have finished their current task.
135
*
136
* Returns 0 on success;
137
* -1 on failure
138
*/
139
int
t_pool_flush
(
t_pool
*p);
140
141
/*
142
* Destroys a thread pool. If 'kill' is true the threads are terminated now,
143
* otherwise they are joined into the main thread so they will finish their
144
* current work load.
145
*
146
* Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
147
* t_pool_destroy(p,1) to quickly exit after a fatal error.
148
*/
149
void
t_pool_destroy
(
t_pool
*p,
int
kill);
150
151
/*
152
* Pulls a result off the head of the result queue. Caller should
153
* free it (and any internals as appropriate) after use. This doesn't
154
* wait for a result to be present.
155
*
156
* Results will be returned in strict order.
157
*
158
* Returns t_pool_result pointer if a result is ready.
159
* NULL if not.
160
*/
161
t_pool_result
*
t_pool_next_result
(
t_results_queue
*q);
162
t_pool_result
*
t_pool_next_result_wait
(
t_results_queue
*q);
163
164
/*
165
* Frees a result 'r' and if free_data is true also frees
166
* the internal r->data result too.
167
*/
168
void
t_pool_delete_result
(
t_pool_result
*r,
int
free_data);
169
170
/*
171
* Initialises a results queue.
172
*
173
* Results queue pointer on success;
174
* NULL on failure
175
*/
176
t_results_queue
*
t_results_queue_init
(
void
);
177
178
/* Deallocates memory for a results queue */
179
void
t_results_queue_destroy
(
t_results_queue
*q);
180
181
/*
182
* Returns true if there are no items on the finished results queue and
183
* also none still pending.
184
*/
185
int
t_pool_results_queue_empty
(
t_results_queue
*q);
186
187
/*
188
* Returns the number of completed jobs on the results queue.
189
*/
190
int
t_pool_results_queue_len
(
t_results_queue
*q);
191
192
/*
193
* Returns the number of completed jobs plus the number queued up to run.
194
*/
195
int
t_pool_results_queue_sz
(
t_results_queue
*q);
196
197
#endif
/* _THREAD_POOL_H_ */
Generated on Wed Feb 25 2015 08:32:47 for NVBIO by
1.8.4