1 /* worker thread pool based on POSIX threads
2 * author: John Tsiombikas <nuclear@member.fsf.org>
3 * This code is public domain.
17 dtx_tpool_callback work, done;
18 struct work_item *next;
21 struct dtx_thread_pool {
26 struct work_item *workq, *workq_tail;
27 pthread_mutex_t workq_mutex;
28 pthread_cond_t workq_condvar;
30 int nactive; /* number of active workers (not sleeping) */
32 pthread_cond_t done_condvar;
38 static void *thread_func(void *args);
40 struct dtx_thread_pool *dtx_tpool_create(int num_threads)
43 struct dtx_thread_pool *tpool;
45 if(!(tpool = calloc(1, sizeof *tpool))) {
48 pthread_mutex_init(&tpool->workq_mutex, 0);
49 pthread_cond_init(&tpool->workq_condvar, 0);
50 pthread_cond_init(&tpool->done_condvar, 0);
52 if(num_threads <= 0) {
53 num_threads = dtx_tpool_num_processors();
55 tpool->num_threads = num_threads;
57 if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
61 for(i=0; i<num_threads; i++) {
62 if(pthread_create(tpool->threads + i, 0, thread_func, tpool) == -1) {
63 tpool->threads[i] = 0;
64 dtx_tpool_destroy(tpool);
71 void dtx_tpool_destroy(struct dtx_thread_pool *tpool)
76 dtx_tpool_clear(tpool);
77 tpool->should_quit = 1;
79 pthread_cond_broadcast(&tpool->workq_condvar);
82 printf("dtx_thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
85 for(i=0; i<tpool->num_threads; i++) {
86 pthread_join(tpool->threads[i], 0);
94 pthread_mutex_destroy(&tpool->workq_mutex);
95 pthread_cond_destroy(&tpool->workq_condvar);
96 pthread_cond_destroy(&tpool->done_condvar);
99 void dtx_tpool_begin_batch(struct dtx_thread_pool *tpool)
104 void dtx_tpool_end_batch(struct dtx_thread_pool *tpool)
107 pthread_cond_broadcast(&tpool->workq_condvar);
110 int dtx_tpool_enqueue(struct dtx_thread_pool *tpool, void *data,
111 dtx_tpool_callback work_func, dtx_tpool_callback done_func)
113 struct work_item *job;
115 if(!(job = malloc(sizeof *job))) {
118 job->work = work_func;
119 job->done = done_func;
123 pthread_mutex_lock(&tpool->workq_mutex);
125 tpool->workq_tail->next = job;
126 tpool->workq_tail = job;
128 tpool->workq = tpool->workq_tail = job;
131 pthread_mutex_unlock(&tpool->workq_mutex);
133 if(!tpool->in_batch) {
134 pthread_cond_broadcast(&tpool->workq_condvar);
139 void dtx_tpool_clear(struct dtx_thread_pool *tpool)
141 pthread_mutex_lock(&tpool->workq_mutex);
142 while(tpool->workq) {
143 void *tmp = tpool->workq;
144 tpool->workq = tpool->workq->next;
147 tpool->workq = tpool->workq_tail = 0;
149 pthread_mutex_unlock(&tpool->workq_mutex);
152 int dtx_tpool_queued_jobs(struct dtx_thread_pool *tpool)
155 pthread_mutex_lock(&tpool->workq_mutex);
157 pthread_mutex_unlock(&tpool->workq_mutex);
161 int dtx_tpool_active_jobs(struct dtx_thread_pool *tpool)
164 pthread_mutex_lock(&tpool->workq_mutex);
165 res = tpool->nactive;
166 pthread_mutex_unlock(&tpool->workq_mutex);
170 int dtx_tpool_pending_jobs(struct dtx_thread_pool *tpool)
173 pthread_mutex_lock(&tpool->workq_mutex);
174 res = tpool->qsize + tpool->nactive;
175 pthread_mutex_unlock(&tpool->workq_mutex);
179 void dtx_tpool_wait(struct dtx_thread_pool *tpool)
181 pthread_mutex_lock(&tpool->workq_mutex);
182 while(tpool->nactive || tpool->qsize) {
183 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
185 pthread_mutex_unlock(&tpool->workq_mutex);
188 void dtx_tpool_wait_one(struct dtx_thread_pool *tpool)
191 pthread_mutex_lock(&tpool->workq_mutex);
192 cur_pending = tpool->qsize + tpool->nactive;
194 while(tpool->qsize + tpool->nactive >= cur_pending) {
195 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
198 pthread_mutex_unlock(&tpool->workq_mutex);
201 long dtx_tpool_timedwait(struct dtx_thread_pool *tpool, long timeout)
204 struct timespec tout_ts;
205 struct timeval tv0, tv;
206 gettimeofday(&tv0, 0);
208 sec = timeout / 1000;
209 tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
210 tout_ts.tv_sec = tv0.tv_sec + sec;
212 pthread_mutex_lock(&tpool->workq_mutex);
213 while(tpool->nactive || tpool->qsize) {
214 if(pthread_cond_timedwait(&tpool->done_condvar,
215 &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
219 pthread_mutex_unlock(&tpool->workq_mutex);
221 gettimeofday(&tv, 0);
222 return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
225 static void *thread_func(void *args)
227 struct dtx_thread_pool *tpool = args;
229 pthread_mutex_lock(&tpool->workq_mutex);
230 while(!tpool->should_quit) {
231 pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
233 while(!tpool->should_quit && tpool->workq) {
234 /* grab the first job */
235 struct work_item *job = tpool->workq;
236 tpool->workq = tpool->workq->next;
238 tpool->workq_tail = 0;
241 pthread_mutex_unlock(&tpool->workq_mutex);
244 job->work(job->data);
246 job->done(job->data);
249 pthread_mutex_lock(&tpool->workq_mutex);
250 /* notify everyone interested that we're done with this job */
251 pthread_cond_broadcast(&tpool->done_condvar);
255 pthread_mutex_unlock(&tpool->workq_mutex);
259 #endif /* USE_THREADS */
261 /* The following highly platform-specific code detects the number
262 * of processors available in the system. It's used by the thread pool
263 * to autodetect how many threads to spawn.
264 * Currently works on: Linux, BSD, Darwin, and Windows.
267 #if defined(__APPLE__) && defined(__MACH__)
276 #if defined(unix) || defined(__unix__)
280 # include <sys/sysctl.h>
284 #if defined(WIN32) || defined(__WIN32__)
289 int dtx_tpool_num_processors(void)
291 #if defined(unix) || defined(__unix__)
292 # if defined(__bsd__)
293 /* BSD systems provide the num.processors through sysctl */
294 int num, mib[] = {CTL_HW, HW_NCPU};
295 size_t len = sizeof num;
297 sysctl(mib, 2, &num, &len, 0, 0);
300 # elif defined(__sgi)
301 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
302 return sysconf(_SC_NPROC_ONLN);
304 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
305 return sysconf(_SC_NPROCESSORS_ONLN);
306 # endif /* bsd/sgi/other */
308 #elif defined(WIN32) || defined(__WIN32__)
309 /* under windows we need to call GetSystemInfo */
311 GetSystemInfo(&info);
312 return info.dwNumberOfProcessors;