45 static int worker_id(
t_pool *p) {
47 pthread_t s = pthread_self();
48 for (i = 0; i < p->
tsize; i++) {
49 if (pthread_equal(s, p->
t[i]))
72 static int t_pool_add_result(
t_pool_job *j,
void *data) {
77 fprintf(stderr,
"%d: Adding resulting to queue %p, serial %d\n",
85 if (!(r = malloc(
sizeof(*r))))
103 fprintf(stderr,
"%d: Broadcasting result_avail (id %d)\n",
108 fprintf(stderr,
"%d: Broadcast complete\n", worker_id(j->
p));
158 fprintf(stderr,
"Requesting next result on queue %p\n", q);
162 r = t_pool_next_result_locked(q);
166 fprintf(stderr,
"(q=%p) Found %p\n", q, r);
176 fprintf(stderr,
"Waiting for result %d...\n", q->
next_serial);
180 while (!(r = t_pool_next_result_locked(q))) {
183 struct timespec timeout;
185 gettimeofday(&now, NULL);
186 timeout.tv_sec = now.tv_sec + 10;
187 timeout.tv_nsec = now.tv_usec * 1000;
242 if (free_data && r->
data)
257 pthread_mutex_init(&q->
result_m, NULL);
273 fprintf(stderr,
"Destroying results queue %p\n", q);
279 pthread_mutex_destroy(&q->
result_m);
282 memset(q, 0xbb,
sizeof(*q));
286 fprintf(stderr,
"Destroyed results queue %p\n", q);
294 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
303 static void *t_pool_worker(
void *arg) {
307 struct timeval t1, t2, t3;
313 gettimeofday(&t1, NULL);
316 pthread_mutex_lock(&p->
pool_m);
319 gettimeofday(&t2, NULL);
326 pthread_cond_signal(&p->
empty_c);
328 gettimeofday(&t2, NULL);
334 gettimeofday(&t3, NULL);
344 fprintf(stderr,
"%d: Shutting down\n", worker_id(p));
346 pthread_mutex_unlock(&p->
pool_m);
355 pthread_cond_signal(&p->
full_c);
358 pthread_cond_signal(&p->
empty_c);
360 pthread_mutex_unlock(&p->
pool_m);
363 t_pool_add_result(j, j->
func(j->
arg));
365 pthread_mutex_lock(&p->
pool_m);
366 gettimeofday(&t3, NULL);
368 pthread_mutex_unlock(&p->
pool_m);
370 memset(j, 0xbb,
sizeof(*j));
385 t_pool *p = malloc(
sizeof(*p));
396 p->
t = malloc(tsize *
sizeof(p->
t[0]));
398 pthread_mutex_init(&p->
pool_m, NULL);
399 pthread_cond_init(&p->
empty_c, NULL);
401 pthread_cond_init(&p->
full_c, NULL);
403 for (i = 0; i < tsize; i++) {
404 if (0 != pthread_create(&p->
t[i], NULL, t_pool_worker, p))
424 void *(*func)(
void *arg),
void *arg) {
444 fprintf(stderr,
"Dispatching job %p for queue %p, serial %d\n", j, q, j->
serial);
447 pthread_mutex_lock(&p->
pool_m);
467 pthread_mutex_unlock(&p->
pool_m);
470 fprintf(stderr,
"Dispatched (serial %d)\n", j->
serial);
484 void *(*func)(
void *arg),
void *arg,
int nonblock) {
503 fprintf(stderr,
"Dispatching job for queue %p, serial %d\n", q, j->
serial);
506 pthread_mutex_lock(&p->
pool_m);
509 pthread_mutex_unlock(&p->
pool_m);
540 fprintf(stderr,
"Dispatched (serial %d)\n", j->
serial);
548 pthread_mutex_unlock(&p->
pool_m);
562 fprintf(stderr,
"Flushing pool %p\n", p);
566 pthread_mutex_lock(&p->
pool_m);
570 pthread_mutex_unlock(&p->
pool_m);
573 fprintf(stderr,
"Flushed complete for pool %p, njobs=%d, nwaiting=%d\n",
592 fprintf(stderr,
"Destroying pool %p, kill=%d\n", p, kill);
597 pthread_mutex_lock(&p->
pool_m);
601 fprintf(stderr,
"Sending shutdown request\n");
605 pthread_mutex_unlock(&p->
pool_m);
608 fprintf(stderr,
"Shutdown complete\n");
610 for (i = 0; i < p->
tsize; i++)
611 pthread_join(p->
t[i], NULL);
613 for (i = 0; i < p->
tsize; i++)
614 pthread_kill(p->
t[i], SIGINT);
617 pthread_mutex_destroy(&p->
pool_m);
618 pthread_cond_destroy(&p->
empty_c);
620 pthread_cond_destroy(&p->
full_c);
623 fprintf(stderr,
"Total time=%f\n", p->
total_time / 1000000.0);
624 fprintf(stderr,
"Wait time=%f\n", p->
wait_time / 1000000.0);
625 fprintf(stderr,
"%d%% utilisation\n",
633 fprintf(stderr,
"Destroyed pool %p\n", p);
647 void *doit(
void *arg) {
649 int job = *(
int *)arg;
652 printf(
"Worker: execute job %d\n", job);
654 usleep(random() % 1000000);
656 for (k = 0; k < 100; k++) {
657 for (i = 0; i < 100000; i++) {
669 printf(
"Worker: job %d terminating, x=%d\n", job, x);
673 res = malloc(
sizeof(*res));
681 int main(
int argc,
char **argv) {
688 for (i = 0; i < 20; i++) {
689 int *ip = malloc(
sizeof(*ip));
691 printf(
"Submitting %d\n", i);
696 printf(
"RESULT: %d\n", *(
int *)r->
data);
704 printf(
"RESULT: %d\n", *(
int *)r->
data);