1 /* worker thread pool based on POSIX threads
2 * author: John Tsiombikas <nuclear@member.fsf.org>
3 * This code is public domain.
15 tpool_callback work, done;
16 struct work_item *next;
24 struct work_item *workq, *workq_tail;
25 pthread_mutex_t workq_mutex;
26 pthread_cond_t workq_condvar;
28 int nactive; /* number of active workers (not sleeping) */
30 pthread_cond_t done_condvar;
36 static void *thread_func(void *args);
38 struct thread_pool *tpool_create(int num_threads)
41 struct thread_pool *tpool;
43 if(!(tpool = calloc(1, sizeof *tpool))) {
46 pthread_mutex_init(&tpool->workq_mutex, 0);
47 pthread_cond_init(&tpool->workq_condvar, 0);
48 pthread_cond_init(&tpool->done_condvar, 0);
50 if(num_threads <= 0) {
51 num_threads = tpool_num_processors();
53 tpool->num_threads = num_threads;
55 if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
59 for(i=0; i<num_threads; i++) {
60 if(pthread_create(tpool->threads + i, 0, thread_func, tpool) == -1) {
61 tpool->threads[i] = 0;
69 void tpool_destroy(struct thread_pool *tpool)
75 tpool->should_quit = 1;
77 pthread_cond_broadcast(&tpool->workq_condvar);
80 printf("thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
83 for(i=0; i<tpool->num_threads; i++) {
84 pthread_join(tpool->threads[i], 0);
92 pthread_mutex_destroy(&tpool->workq_mutex);
93 pthread_cond_destroy(&tpool->workq_condvar);
94 pthread_cond_destroy(&tpool->done_condvar);
97 void tpool_begin_batch(struct thread_pool *tpool)
102 void tpool_end_batch(struct thread_pool *tpool)
105 pthread_cond_broadcast(&tpool->workq_condvar);
108 int tpool_enqueue(struct thread_pool *tpool, void *data,
109 tpool_callback work_func, tpool_callback done_func)
111 struct work_item *job;
113 if(!(job = malloc(sizeof *job))) {
116 job->work = work_func;
117 job->done = done_func;
121 pthread_mutex_lock(&tpool->workq_mutex);
123 tpool->workq_tail->next = job;
124 tpool->workq_tail = job;
126 tpool->workq = tpool->workq_tail = job;
129 pthread_mutex_unlock(&tpool->workq_mutex);
131 if(!tpool->in_batch) {
132 pthread_cond_broadcast(&tpool->workq_condvar);
137 void tpool_clear(struct thread_pool *tpool)
139 pthread_mutex_lock(&tpool->workq_mutex);
140 while(tpool->workq) {
141 void *tmp = tpool->workq;
142 tpool->workq = tpool->workq->next;
145 tpool->workq = tpool->workq_tail = 0;
147 pthread_mutex_unlock(&tpool->workq_mutex);
150 int tpool_queued_jobs(struct thread_pool *tpool)
153 pthread_mutex_lock(&tpool->workq_mutex);
155 pthread_mutex_unlock(&tpool->workq_mutex);
159 int tpool_active_jobs(struct thread_pool *tpool)
162 pthread_mutex_lock(&tpool->workq_mutex);
163 res = tpool->nactive;
164 pthread_mutex_unlock(&tpool->workq_mutex);
168 int tpool_pending_jobs(struct thread_pool *tpool)
171 pthread_mutex_lock(&tpool->workq_mutex);
172 res = tpool->qsize + tpool->nactive;
173 pthread_mutex_unlock(&tpool->workq_mutex);
177 void tpool_wait(struct thread_pool *tpool)
179 pthread_mutex_lock(&tpool->workq_mutex);
180 while(tpool->nactive || tpool->qsize) {
181 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
183 pthread_mutex_unlock(&tpool->workq_mutex);
186 void tpool_wait_one(struct thread_pool *tpool)
189 pthread_mutex_lock(&tpool->workq_mutex);
190 cur_pending = tpool->qsize + tpool->nactive;
192 while(tpool->qsize + tpool->nactive >= cur_pending) {
193 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
196 pthread_mutex_unlock(&tpool->workq_mutex);
199 long tpool_timedwait(struct thread_pool *tpool, long timeout)
201 struct timespec tout_ts;
202 struct timeval tv0, tv;
205 gettimeofday(&tv0, 0);
207 sec = timeout / 1000;
208 tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
209 tout_ts.tv_sec = tv0.tv_sec + sec;
211 pthread_mutex_lock(&tpool->workq_mutex);
212 while(tpool->nactive || tpool->qsize) {
213 if(pthread_cond_timedwait(&tpool->done_condvar,
214 &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
218 pthread_mutex_unlock(&tpool->workq_mutex);
220 gettimeofday(&tv, 0);
221 return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
224 static void *thread_func(void *args)
226 struct thread_pool *tpool = args;
228 pthread_mutex_lock(&tpool->workq_mutex);
229 while(!tpool->should_quit) {
230 pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
232 while(!tpool->should_quit && tpool->workq) {
233 /* grab the first job */
234 struct work_item *job = tpool->workq;
235 tpool->workq = tpool->workq->next;
237 tpool->workq_tail = 0;
240 pthread_mutex_unlock(&tpool->workq_mutex);
243 job->work(job->data);
245 job->done(job->data);
248 pthread_mutex_lock(&tpool->workq_mutex);
249 /* notify everyone interested that we're done with this job */
250 pthread_cond_broadcast(&tpool->done_condvar);
254 pthread_mutex_unlock(&tpool->workq_mutex);
260 /* The following highly platform-specific code detects the number
261 * of processors available in the system. It's used by the thread pool
262 * to autodetect how many threads to spawn.
263 * Currently works on: Linux, BSD, Darwin, and Windows.
266 #if defined(__APPLE__) && defined(__MACH__)
275 #if defined(unix) || defined(__unix__)
279 # include <sys/sysctl.h>
283 #if defined(WIN32) || defined(__WIN32__)
288 int tpool_num_processors(void)
290 #if defined(unix) || defined(__unix__)
291 # if defined(__bsd__)
292 /* BSD systems provide the num.processors through sysctl */
293 int num, mib[] = {CTL_HW, HW_NCPU};
294 size_t len = sizeof num;
296 sysctl(mib, 2, &num, &len, 0, 0);
299 # elif defined(__sgi)
300 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
301 return sysconf(_SC_NPROC_ONLN);
303 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
304 return sysconf(_SC_NPROCESSORS_ONLN);
305 # endif /* bsd/sgi/other */
307 #elif defined(WIN32) || defined(__WIN32__)
308 /* under windows we need to call GetSystemInfo */
310 GetSystemInfo(&info);
311 return info.dwNumberOfProcessors;