2 /* worker thread pool based on POSIX threads
3 * author: John Tsiombikas <nuclear@member.fsf.org>
4 * This code is public domain.
16 tpool_callback work, done;
17 struct work_item *next;
25 struct work_item *workq, *workq_tail;
26 pthread_mutex_t workq_mutex;
27 pthread_cond_t workq_condvar;
29 int nactive; /* number of active workers (not sleeping) */
31 pthread_cond_t done_condvar;
37 static void *thread_func(void *args);
39 struct thread_pool *tpool_create(int num_threads)
42 struct thread_pool *tpool;
44 if(!(tpool = calloc(1, sizeof *tpool))) {
47 pthread_mutex_init(&tpool->workq_mutex, 0);
48 pthread_cond_init(&tpool->workq_condvar, 0);
49 pthread_cond_init(&tpool->done_condvar, 0);
51 if(num_threads <= 0) {
52 num_threads = tpool_num_processors();
54 tpool->num_threads = num_threads;
56 if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
60 for(i=0; i<num_threads; i++) {
61 if(pthread_create(tpool->threads + i, 0, thread_func, tpool) == -1) {
62 tpool->threads[i] = 0;
70 void tpool_destroy(struct thread_pool *tpool)
76 tpool->should_quit = 1;
78 pthread_cond_broadcast(&tpool->workq_condvar);
81 printf("thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
84 for(i=0; i<tpool->num_threads; i++) {
85 pthread_join(tpool->threads[i], 0);
93 pthread_mutex_destroy(&tpool->workq_mutex);
94 pthread_cond_destroy(&tpool->workq_condvar);
95 pthread_cond_destroy(&tpool->done_condvar);
98 void tpool_begin_batch(struct thread_pool *tpool)
103 void tpool_end_batch(struct thread_pool *tpool)
106 pthread_cond_broadcast(&tpool->workq_condvar);
109 int tpool_enqueue(struct thread_pool *tpool, void *data,
110 tpool_callback work_func, tpool_callback done_func)
112 struct work_item *job;
114 if(!(job = malloc(sizeof *job))) {
117 job->work = work_func;
118 job->done = done_func;
122 pthread_mutex_lock(&tpool->workq_mutex);
124 tpool->workq_tail->next = job;
125 tpool->workq_tail = job;
127 tpool->workq = tpool->workq_tail = job;
130 pthread_mutex_unlock(&tpool->workq_mutex);
132 if(!tpool->in_batch) {
133 pthread_cond_broadcast(&tpool->workq_condvar);
138 void tpool_clear(struct thread_pool *tpool)
140 pthread_mutex_lock(&tpool->workq_mutex);
141 while(tpool->workq) {
142 void *tmp = tpool->workq;
143 tpool->workq = tpool->workq->next;
146 tpool->workq = tpool->workq_tail = 0;
148 pthread_mutex_unlock(&tpool->workq_mutex);
151 int tpool_queued_jobs(struct thread_pool *tpool)
154 pthread_mutex_lock(&tpool->workq_mutex);
156 pthread_mutex_unlock(&tpool->workq_mutex);
160 int tpool_active_jobs(struct thread_pool *tpool)
163 pthread_mutex_lock(&tpool->workq_mutex);
164 res = tpool->nactive;
165 pthread_mutex_unlock(&tpool->workq_mutex);
169 int tpool_pending_jobs(struct thread_pool *tpool)
172 pthread_mutex_lock(&tpool->workq_mutex);
173 res = tpool->qsize + tpool->nactive;
174 pthread_mutex_unlock(&tpool->workq_mutex);
178 void tpool_wait(struct thread_pool *tpool)
180 pthread_mutex_lock(&tpool->workq_mutex);
181 while(tpool->nactive || tpool->qsize) {
182 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
184 pthread_mutex_unlock(&tpool->workq_mutex);
187 void tpool_wait_one(struct thread_pool *tpool)
190 pthread_mutex_lock(&tpool->workq_mutex);
191 cur_pending = tpool->qsize + tpool->nactive;
193 while(tpool->qsize + tpool->nactive >= cur_pending) {
194 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
197 pthread_mutex_unlock(&tpool->workq_mutex);
200 long tpool_timedwait(struct thread_pool *tpool, long timeout)
202 struct timespec tout_ts;
203 struct timeval tv0, tv;
206 gettimeofday(&tv0, 0);
208 sec = timeout / 1000;
209 tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
210 tout_ts.tv_sec = tv0.tv_sec + sec;
212 pthread_mutex_lock(&tpool->workq_mutex);
213 while(tpool->nactive || tpool->qsize) {
214 if(pthread_cond_timedwait(&tpool->done_condvar,
215 &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
219 pthread_mutex_unlock(&tpool->workq_mutex);
221 gettimeofday(&tv, 0);
222 return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
225 static void *thread_func(void *args)
227 struct thread_pool *tpool = args;
229 pthread_mutex_lock(&tpool->workq_mutex);
230 while(!tpool->should_quit) {
231 pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
233 while(!tpool->should_quit && tpool->workq) {
234 /* grab the first job */
235 struct work_item *job = tpool->workq;
236 tpool->workq = tpool->workq->next;
238 tpool->workq_tail = 0;
241 pthread_mutex_unlock(&tpool->workq_mutex);
244 job->work(job->data);
246 job->done(job->data);
249 pthread_mutex_lock(&tpool->workq_mutex);
250 /* notify everyone interested that we're done with this job */
251 pthread_cond_broadcast(&tpool->done_condvar);
255 pthread_mutex_unlock(&tpool->workq_mutex);
261 /* The following highly platform-specific code detects the number
262 * of processors available in the system. It's used by the thread pool
263 * to autodetect how many threads to spawn.
264 * Currently works on: Linux, BSD, Darwin, and Windows.
267 #if defined(__APPLE__) && defined(__MACH__)
276 #if defined(unix) || defined(__unix__)
280 # include <sys/sysctl.h>
284 #if defined(WIN32) || defined(__WIN32__)
289 int tpool_num_processors(void)
291 #if defined(unix) || defined(__unix__)
292 # if defined(__bsd__)
293 /* BSD systems provide the num.processors through sysctl */
294 int num, mib[] = {CTL_HW, HW_NCPU};
295 size_t len = sizeof num;
297 sysctl(mib, 2, &num, &len, 0, 0);
300 # elif defined(__sgi)
301 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
302 return sysconf(_SC_NPROC_ONLN);
304 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
305 return sysconf(_SC_NPROCESSORS_ONLN);
306 # endif /* bsd/sgi/other */
308 #elif defined(WIN32) || defined(__WIN32__)
309 /* under windows we need to call GetSystemInfo */
311 GetSystemInfo(&info);
312 return info.dwNumberOfProcessors;
316 #endif /* !def NO_THREADS */