NVBIO
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
thread_pool.h
Go to the documentation of this file.
1 /*
2 Copyright (c) 2013 Genome Research Ltd.
3 Author: James Bonfield <jkb@sanger.ac.uk>
4 
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7 
8  1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10 
11  2. Redistributions in binary form must reproduce the above copyright notice,
12 this list of conditions and the following disclaimer in the documentation
13 and/or other materials provided with the distribution.
14 
15  3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
16 Institute nor the names of its contributors may be used to endorse or promote
17 products derived from this software without specific prior written permission.
18 
19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND
20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30 
31 /*
32  * This file implements a thread pool for multi-threading applications.
33  * It consists of two distinct interfaces: thread pools an results queues.
34  *
35  * The pool of threads is given a function pointer and void* data to pass in.
36  * This means the pool can run jobs of multiple types, albeit first come
37  * first served with no job scheduling.
38  *
39  * Upon completion, the return value from the function pointer is added to
40  * a results queue. We may have multiple queues in use for the one pool.
41  *
42  * An example: reading from BAM and writing to CRAM with 10 threads. We'll
43  * have a pool of 10 threads and two results queues holding decoded BAM blocks
44  * and encoded CRAM blocks respectively.
45  */
46 
47 #ifndef _THREAD_POOL_H_
48 #define _THREAD_POOL_H_
49 
50 #include <pthread.h>
51 
52 struct t_pool;
53 struct t_results_queue;
54 
55 typedef struct t_pool_job {
56  void *(*func)(void *arg);
57  void *arg;
58  struct t_pool_job *next;
59 
60  struct t_pool *p;
61  struct t_results_queue *q;
62  int serial;
63 } t_pool_job;
64 
65 typedef struct t_res {
66  struct t_res *next;
67  int serial; // sequential number for ordering
68  void *data; // result itself
70 
71 typedef struct t_pool {
72  int qsize; // size of queue
73  int njobs; // pending job count
74  int nwaiting; // how many workers waiting for new jobs
75  int shutdown; // true if pool is being destroyed
76 
77  // queue of pending jobs
79 
80  // threads
81  int tsize; // maximum number of jobs
82  pthread_t *t;
83 
84  // Mutexes
85  pthread_mutex_t pool_m; // used when updating head/tail
86 
87  pthread_cond_t empty_c;
88  pthread_cond_t pending_c; // not empty
89  pthread_cond_t full_c;
90 
91  // Debugging to check wait time
92  long long total_time, wait_time;
93 } t_pool;
94 
95 typedef struct t_results_queue {
100  int queue_len; // number of items in queue
101  int pending; // number of pending items (in progress or in pool list)
102  pthread_mutex_t result_m;
103  pthread_cond_t result_avail_c;
105 
106 
107 /*
108  * Creates a worker pool of length qsize with tsize worker threads.
109  *
110  * Returns pool pointer on success;
111  * NULL on failure
112  */
113 t_pool *t_pool_init(int qsize, int tsize);
114 
115 /*
116  * Adds an item to the work pool.
117  *
118  * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
119  * result returned. Ie rather than blocking on full queue we're permitted
120  * to return early on "result available" event too.
121  * Caller would then have a while loop around t_pool_dispatch.
122  * Or, return -1 and set errno to E_AGAIN to indicate job not yet submitted.
123  *
124  * Returns 0 on success
125  * -1 on failure
126  */
128  void *(*func)(void *arg), void *arg);
130  void *(*func)(void *arg), void *arg, int nonblock);
131 
132 /*
133  * Flushes the pool, but doesn't exit. This simply drains the queue and
134  * ensures all worker threads have finished their current task.
135  *
136  * Returns 0 on success;
137  * -1 on failure
138  */
139 int t_pool_flush(t_pool *p);
140 
141 /*
142  * Destroys a thread pool. If 'kill' is true the threads are terminated now,
143  * otherwise they are joined into the main thread so they will finish their
144  * current work load.
145  *
146  * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
147  * t_pool_destroy(p,1) to quickly exit after a fatal error.
148  */
149 void t_pool_destroy(t_pool *p, int kill);
150 
151 /*
152  * Pulls a result off the head of the result queue. Caller should
153  * free it (and any internals as appropriate) after use. This doesn't
154  * wait for a result to be present.
155  *
156  * Results will be returned in strict order.
157  *
158  * Returns t_pool_result pointer if a result is ready.
159  * NULL if not.
160  */
163 
164 /*
165  * Frees a result 'r' and if free_data is true also frees
166  * the internal r->data result too.
167  */
168 void t_pool_delete_result(t_pool_result *r, int free_data);
169 
170 /*
171  * Initialises a results queue.
172  *
173  * Results queue pointer on success;
174  * NULL on failure
175  */
177 
178 /* Deallocates memory for a results queue */
180 
181 /*
182  * Returns true if there are no items on the finished results queue and
183  * also none still pending.
184  */
186 
187 /*
188  * Returns the number of completed jobs on the results queue.
189  */
191 
192 /*
193  * Returns the number of completed jobs plus the number queued up to run.
194  */
196 
197 #endif /* _THREAD_POOL_H_ */