NVBIO
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
thread_pool.c
Go to the documentation of this file.
1 /*
2 Copyright (c) 2013 Genome Research Ltd.
3 Author: James Bonfield <jkb@sanger.ac.uk>
4 
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7 
8  1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10 
11  2. Redistributions in binary form must reproduce the above copyright notice,
12 this list of conditions and the following disclaimer in the documentation
13 and/or other materials provided with the distribution.
14 
15  3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
16 Institute nor the names of its contributors may be used to endorse or promote
17 products derived from this software without specific prior written permission.
18 
19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND
20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30 
31 #include <stdlib.h>
32 
33 #include <signal.h>
34 #include <errno.h>
35 #include <stdio.h>
36 #include <string.h>
37 #include <sys/time.h>
38 
39 #include "cram/thread_pool.h"
40 
41 //#define DEBUG
42 #define DEBUG_TIME
43 
44 #ifdef DEBUG
45 static int worker_id(t_pool *p) {
46  int i;
47  pthread_t s = pthread_self();
48  for (i = 0; i < p->tsize; i++) {
49  if (pthread_equal(s, p->t[i]))
50  return i;
51  }
52  return -1;
53 }
54 #endif
55 
56 /* ----------------------------------------------------------------------------
57  * A queue to hold results from the thread pool.
58  *
59  * Each thread pool may have jobs of multiple types being queued up and
60  * interleaved, so we allow several results queue per pool.
61  *
62  * The jobs themselves are expected to push their results onto their
63  * appropriate results queue.
64  */
65 
66 /*
67  * Adds a result to the end of the result queue.
68  *
69  * Returns 0 on success;
70  * -1 on failure
71  */
72 static int t_pool_add_result(t_pool_job *j, void *data) {
73  t_results_queue *q = j->q;
74  t_pool_result *r;
75 
76 #ifdef DEBUG
77  fprintf(stderr, "%d: Adding resulting to queue %p, serial %d\n",
78  worker_id(j->p), q, j->serial);
79 #endif
80 
81  /* No results queue is fine if we don't want any results back */
82  if (!q)
83  return 0;
84 
85  if (!(r = malloc(sizeof(*r))))
86  return -1;
87 
88  r->next = NULL;
89  r->data = data;
90  r->serial = j->serial;
91 
92  pthread_mutex_lock(&q->result_m);
93  if (q->result_tail) {
94  q->result_tail->next = r;
95  q->result_tail = r;
96  } else {
97  q->result_head = q->result_tail = r;
98  }
99  q->queue_len++;
100  q->pending--;
101 
102 #ifdef DEBUG
103  fprintf(stderr, "%d: Broadcasting result_avail (id %d)\n",
104  worker_id(j->p), r->serial);
105 #endif
106  pthread_cond_broadcast(&q->result_avail_c);
107 #ifdef DEBUG
108  fprintf(stderr, "%d: Broadcast complete\n", worker_id(j->p));
109 #endif
110 
111  pthread_mutex_unlock(&q->result_m);
112 
113  return 0;
114 }
115 
116 /* Core of t_pool_next_result() */
117 static t_pool_result *t_pool_next_result_locked(t_results_queue *q) {
118  t_pool_result *r, *last;
119 
120  for (last = NULL, r = q->result_head; r; last = r, r = r->next) {
121  if (r->serial == q->next_serial)
122  break;
123  }
124 
125  if (r) {
126  if (q->result_head == r)
127  q->result_head = r->next;
128  else
129  last->next = r->next;
130 
131  if (q->result_tail == r)
132  q->result_tail = last;
133 
134  if (!q->result_head)
135  q->result_tail = NULL;
136 
137  q->next_serial++;
138  q->queue_len--;
139  }
140 
141  return r;
142 }
143 
144 /*
145  * Pulls a result off the head of the result queue. Caller should
146  * free it (and any internals as appropriate) after use. This doesn't
147  * wait for a result to be present.
148  *
149  * Results will be returned in strict order.
150  *
151  * Returns t_pool_result pointer if a result is ready.
152  * NULL if not.
153  */
155  t_pool_result *r;
156 
157 #ifdef DEBUG
158  fprintf(stderr, "Requesting next result on queue %p\n", q);
159 #endif
160 
161  pthread_mutex_lock(&q->result_m);
162  r = t_pool_next_result_locked(q);
163  pthread_mutex_unlock(&q->result_m);
164 
165 #ifdef DEBUG
166  fprintf(stderr, "(q=%p) Found %p\n", q, r);
167 #endif
168 
169  return r;
170 }
171 
173  t_pool_result *r;
174 
175 #ifdef DEBUG
176  fprintf(stderr, "Waiting for result %d...\n", q->next_serial);
177 #endif
178 
179  pthread_mutex_lock(&q->result_m);
180  while (!(r = t_pool_next_result_locked(q))) {
181  /* Possible race here now avoided via _locked() call, but incase... */
182  struct timeval now;
183  struct timespec timeout;
184 
185  gettimeofday(&now, NULL);
186  timeout.tv_sec = now.tv_sec + 10;
187  timeout.tv_nsec = now.tv_usec * 1000;
188 
189  pthread_cond_timedwait(&q->result_avail_c, &q->result_m, &timeout);
190  }
191  pthread_mutex_unlock(&q->result_m);
192 
193  return r;
194 }
195 
196 /*
197  * Returns true if there are no items on the finished results queue and
198  * also none still pending.
199  */
201  int empty;
202 
203  pthread_mutex_lock(&q->result_m);
204  empty = q->queue_len == 0 && q->pending == 0;
205  pthread_mutex_unlock(&q->result_m);
206 
207  return empty;
208 }
209 
210 
211 /*
212  * Returns the number of completed jobs on the results queue.
213  */
215  int len;
216 
217  pthread_mutex_lock(&q->result_m);
218  len = q->queue_len;
219  pthread_mutex_unlock(&q->result_m);
220 
221  return len;
222 }
223 
225  int len;
226 
227  pthread_mutex_lock(&q->result_m);
228  len = q->queue_len + q->pending;
229  pthread_mutex_unlock(&q->result_m);
230 
231  return len;
232 }
233 
234 /*
235  * Frees a result 'r' and if free_data is true also frees
236  * the internal r->data result too.
237  */
238 void t_pool_delete_result(t_pool_result *r, int free_data) {
239  if (!r)
240  return;
241 
242  if (free_data && r->data)
243  free(r->data);
244 
245  free(r);
246 }
247 
248 /*
249  * Initialises a results queue.
250  *
251  * Results queue pointer on success;
252  * NULL on failure
253  */
255  t_results_queue *q = malloc(sizeof(*q));
256 
257  pthread_mutex_init(&q->result_m, NULL);
258  pthread_cond_init(&q->result_avail_c, NULL);
259 
260  q->result_head = NULL;
261  q->result_tail = NULL;
262  q->next_serial = 0;
263  q->curr_serial = 0;
264  q->queue_len = 0;
265  q->pending = 0;
266 
267  return q;
268 }
269 
270 /* Deallocates memory for a results queue */
272 #ifdef DEBUG
273  fprintf(stderr, "Destroying results queue %p\n", q);
274 #endif
275 
276  if (!q)
277  return;
278 
279  pthread_mutex_destroy(&q->result_m);
280  pthread_cond_destroy(&q->result_avail_c);
281 
282  memset(q, 0xbb, sizeof(*q));
283  free(q);
284 
285 #ifdef DEBUG
286  fprintf(stderr, "Destroyed results queue %p\n", q);
287 #endif
288 }
289 
290 /* ----------------------------------------------------------------------------
291  * The thread pool.
292  */
293 
294 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
295 
296 /*
297  * A worker thread.
298  *
299  * Each thread waits for the pool to be non-empty.
300  * As soon as this applies, one of them succeeds in getting the lock
301  * and then executes the job.
302  */
303 static void *t_pool_worker(void *arg) {
304  t_pool *p = (t_pool *)arg;
305  t_pool_job *j;
306 #ifdef DEBUG_TIME
307  struct timeval t1, t2, t3;
308 #endif
309 
310  for (;;) {
311  // Pop an item off the pool queue
312 #ifdef DEBUG_TIME
313  gettimeofday(&t1, NULL);
314 #endif
315 
316  pthread_mutex_lock(&p->pool_m);
317 
318 #ifdef DEBUG_TIME
319  gettimeofday(&t2, NULL);
320  p->wait_time += TDIFF(t2,t1);
321 #endif
322 
323  p->nwaiting++;
324  while (!p->head && !p->shutdown) {
325  if (p->njobs == 0)
326  pthread_cond_signal(&p->empty_c);
327 #ifdef DEBUG_TIME
328  gettimeofday(&t2, NULL);
329 #endif
330 
331  pthread_cond_wait(&p->pending_c, &p->pool_m);
332 
333 #ifdef DEBUG_TIME
334  gettimeofday(&t3, NULL);
335  p->wait_time += TDIFF(t3,t2);
336 #endif
337  }
338 
339  p->nwaiting--;
340 
341  if (p->shutdown) {
342  p->total_time += TDIFF(t3,t1);
343 #ifdef DEBUG
344  fprintf(stderr, "%d: Shutting down\n", worker_id(p));
345 #endif
346  pthread_mutex_unlock(&p->pool_m);
347  pthread_exit(NULL);
348  }
349 
350  j = p->head;
351  if (!(p->head = j->next))
352  p->tail = NULL;
353 
354  if (p->njobs-- == p->qsize)
355  pthread_cond_signal(&p->full_c);
356 
357  if (p->njobs == 0)
358  pthread_cond_signal(&p->empty_c);
359 
360  pthread_mutex_unlock(&p->pool_m);
361 
362  // We have job 'j' - now execute it.
363  t_pool_add_result(j, j->func(j->arg));
364 #ifdef DEBUG_TIME
365  pthread_mutex_lock(&p->pool_m);
366  gettimeofday(&t3, NULL);
367  p->total_time += TDIFF(t3,t1);
368  pthread_mutex_unlock(&p->pool_m);
369 #endif
370  memset(j, 0xbb, sizeof(*j));
371  free(j);
372  }
373 
374  return NULL;
375 }
376 
377 /*
378  * Creates a worker pool of length qsize with tsize worker threads.
379  *
380  * Returns pool pointer on success;
381  * NULL on failure
382  */
383 t_pool *t_pool_init(int qsize, int tsize) {
384  int i;
385  t_pool *p = malloc(sizeof(*p));
386  p->qsize = qsize;
387  p->tsize = tsize;
388  p->njobs = 0;
389  p->nwaiting = 0;
390  p->shutdown = 0;
391  p->head = p->tail = NULL;
392 #ifdef DEBUG_TIME
393  p->total_time = p->wait_time = 0;
394 #endif
395 
396  p->t = malloc(tsize * sizeof(p->t[0]));
397 
398  pthread_mutex_init(&p->pool_m, NULL);
399  pthread_cond_init(&p->empty_c, NULL);
400  pthread_cond_init(&p->pending_c, NULL);
401  pthread_cond_init(&p->full_c, NULL);
402 
403  for (i = 0; i < tsize; i++) {
404  if (0 != pthread_create(&p->t[i], NULL, t_pool_worker, p))
405  return NULL;
406  }
407 
408  return p;
409 }
410 
411 /*
412  * Adds an item to the work pool.
413  *
414  * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
415  * result returned. Ie rather than blocking on full queue we're permitted
416  * to return early on "result available" event too.
417  * Caller would then have a while loop around t_pool_dispatch.
418  * Or, return -1 and set errno to EAGAIN to indicate job not yet submitted.
419  *
420  * Returns 0 on success
421  * -1 on failure
422  */
424  void *(*func)(void *arg), void *arg) {
425  t_pool_job *j = malloc(sizeof(*j));
426 
427  if (!j)
428  return -1;
429  j->func = func;
430  j->arg = arg;
431  j->next = NULL;
432  j->p = p;
433  j->q = q;
434  if (q) {
435  pthread_mutex_lock(&q->result_m);
436  j->serial = q->curr_serial++;
437  q->pending++;
438  pthread_mutex_unlock(&q->result_m);
439  } else {
440  j->serial = 0;
441  }
442 
443 #ifdef DEBUG
444  fprintf(stderr, "Dispatching job %p for queue %p, serial %d\n", j, q, j->serial);
445 #endif
446 
447  pthread_mutex_lock(&p->pool_m);
448 
449  // Check if queue is full
450  while (p->njobs == p->qsize)
451  pthread_cond_wait(&p->full_c, &p->pool_m);
452 
453  p->njobs++;
454 
455  if (p->tail) {
456  p->tail->next = j;
457  p->tail = j;
458  } else {
459  p->head = p->tail = j;
460  }
461 
462  if (p->njobs == 1) {
463  // First job => tell all worker threads to start up
464  pthread_cond_broadcast(&p->pending_c);
465  }
466 
467  pthread_mutex_unlock(&p->pool_m);
468 
469 #ifdef DEBUG
470  fprintf(stderr, "Dispatched (serial %d)\n", j->serial);
471 #endif
472 
473  return 0;
474 }
475 
476 /*
477  * As above but optional non-block flag.
478  *
479  * nonblock 0 => block if input queue is full
480  * nonblock +1 => don't block if input queue is full, but do not add task
481  * nonblock -1 => add task regardless of whether queue is full (over-size)
482  */
484  void *(*func)(void *arg), void *arg, int nonblock) {
485  t_pool_job *j = malloc(sizeof(*j));
486 
487  if (!j)
488  return -1;
489  j->func = func;
490  j->arg = arg;
491  j->next = NULL;
492  j->p = p;
493  j->q = q;
494  if (q) {
495  pthread_mutex_lock(&q->result_m);
496  j->serial = q->curr_serial;
497  pthread_mutex_unlock(&q->result_m);
498  } else {
499  j->serial = 0;
500  }
501 
502 #ifdef DEBUG
503  fprintf(stderr, "Dispatching job for queue %p, serial %d\n", q, j->serial);
504 #endif
505 
506  pthread_mutex_lock(&p->pool_m);
507 
508  if (p->njobs == p->qsize && nonblock == 1) {
509  pthread_mutex_unlock(&p->pool_m);
510  errno = EAGAIN;
511  free(j);
512  return -1;
513  }
514 
515  if (q) {
516  pthread_mutex_lock(&q->result_m);
517  q->curr_serial++;
518  q->pending++;
519  pthread_mutex_unlock(&q->result_m);
520  }
521 
522  // Check if queue is full
523  if (nonblock == 0)
524  while (p->njobs == p->qsize)
525  pthread_cond_wait(&p->full_c, &p->pool_m);
526 
527  p->njobs++;
528 
529 // if (q->curr_serial % 100 == 0)
530 // fprintf(stderr, "p->njobs = %d p->qsize = %d\n", p->njobs, p->qsize);
531 
532  if (p->tail) {
533  p->tail->next = j;
534  p->tail = j;
535  } else {
536  p->head = p->tail = j;
537  }
538 
539 #ifdef DEBUG
540  fprintf(stderr, "Dispatched (serial %d)\n", j->serial);
541 #endif
542 
543  if (p->njobs == 1) {
544  // First job => tell all worker threads to start up
545  pthread_cond_broadcast(&p->pending_c);
546  }
547 
548  pthread_mutex_unlock(&p->pool_m);
549 
550  return 0;
551 }
552 
553 /*
554  * Flushes the pool, but doesn't exit. This simply drains the queue and
555  * ensures all worker threads have finished their current task.
556  *
557  * Returns 0 on success;
558  * -1 on failure
559  */
561 #ifdef DEBUG
562  fprintf(stderr, "Flushing pool %p\n", p);
563 #endif
564 
565  // Drains the queue
566  pthread_mutex_lock(&p->pool_m);
567  while (p->njobs || p->nwaiting != p->tsize)
568  pthread_cond_wait(&p->empty_c, &p->pool_m);
569 
570  pthread_mutex_unlock(&p->pool_m);
571 
572 #ifdef DEBUG
573  fprintf(stderr, "Flushed complete for pool %p, njobs=%d, nwaiting=%d\n",
574  p, p->njobs, p->nwaiting);
575 #endif
576 
577  return 0;
578 }
579 
580 /*
581  * Destroys a thread pool. If 'kill' is true the threads are terminated now,
582  * otherwise they are joined into the main thread so they will finish their
583  * current work load.
584  *
585  * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
586  * t_pool_destroy(p,1) to quickly exit after a fatal error.
587  */
588 void t_pool_destroy(t_pool *p, int kill) {
589  int i;
590 
591 #ifdef DEBUG
592  fprintf(stderr, "Destroying pool %p, kill=%d\n", p, kill);
593 #endif
594 
595  /* Send shutdown message to worker threads */
596  if (!kill) {
597  pthread_mutex_lock(&p->pool_m);
598  p->shutdown = 1;
599 
600 #ifdef DEBUG
601  fprintf(stderr, "Sending shutdown request\n");
602 #endif
603 
604  pthread_cond_broadcast(&p->pending_c);
605  pthread_mutex_unlock(&p->pool_m);
606 
607 #ifdef DEBUG
608  fprintf(stderr, "Shutdown complete\n");
609 #endif
610  for (i = 0; i < p->tsize; i++)
611  pthread_join(p->t[i], NULL);
612  } else {
613  for (i = 0; i < p->tsize; i++)
614  pthread_kill(p->t[i], SIGINT);
615  }
616 
617  pthread_mutex_destroy(&p->pool_m);
618  pthread_cond_destroy(&p->empty_c);
619  pthread_cond_destroy(&p->pending_c);
620  pthread_cond_destroy(&p->full_c);
621 
622 #ifdef DEBUG_TIME
623  fprintf(stderr, "Total time=%f\n", p->total_time / 1000000.0);
624  fprintf(stderr, "Wait time=%f\n", p->wait_time / 1000000.0);
625  fprintf(stderr, "%d%% utilisation\n",
626  (int)(100 - ((100.0 * p->wait_time) / p->total_time + 0.5)));
627 #endif
628 
629  free(p->t);
630  free(p);
631 
632 #ifdef DEBUG
633  fprintf(stderr, "Destroyed pool %p\n", p);
634 #endif
635 }
636 
637 
638 /*-----------------------------------------------------------------------------
639  * Test app.
640  */
641 
642 #ifdef TEST_MAIN
643 
644 #include <stdio.h>
645 #include <math.h>
646 
647 void *doit(void *arg) {
648  int i, k, x = 0;
649  int job = *(int *)arg;
650  int *res;
651 
652  printf("Worker: execute job %d\n", job);
653 
654  usleep(random() % 1000000); // to coerce job completion out of order
655  if (0) {
656  for (k = 0; k < 100; k++) {
657  for (i = 0; i < 100000; i++) {
658  x++;
659  x += x * sin(i);
660  x += x * cos(x);
661  }
662  }
663  x *= 100;
664  x += job;
665  } else {
666  x = job*job;
667  }
668 
669  printf("Worker: job %d terminating, x=%d\n", job, x);
670 
671  free(arg);
672 
673  res = malloc(sizeof(*res));
674  *res = x;
675 
676  return res;
677 }
678 
679 #define NTHREADS 8
680 
681 int main(int argc, char **argv) {
682  t_pool *p = t_pool_init(NTHREADS*2, NTHREADS);
684  int i;
685  t_pool_result *r;
686 
687  // Dispatch jobs
688  for (i = 0; i < 20; i++) {
689  int *ip = malloc(sizeof(*ip));
690  *ip = i;
691  printf("Submitting %d\n", i);
692  t_pool_dispatch(p, q, doit, ip);
693 
694  // Check for results
695  if ((r = t_pool_next_result(q))) {
696  printf("RESULT: %d\n", *(int *)r->data);
697  t_pool_delete_result(r, 1);
698  }
699  }
700 
701  t_pool_flush(p);
702 
703  while ((r = t_pool_next_result(q))) {
704  printf("RESULT: %d\n", *(int *)r->data);
705  t_pool_delete_result(r, 1);
706  }
707 
708  t_pool_destroy(p, 0);
710 
711  return 0;
712 }
713 #endif