1 /* worker thread pool based on POSIX threads
2 * author: John Tsiombikas <nuclear@member.fsf.org>
3 * This code is public domain.
15 tpool_callback work, done;
16 struct work_item *next;
24 struct work_item *workq, *workq_tail;
25 pthread_mutex_t workq_mutex;
26 pthread_cond_t workq_condvar;
28 int nactive; /* number of active workers (not sleeping) */
30 pthread_cond_t done_condvar;
36 static void *thread_func(void *args);
38 struct thread_pool *tpool_create(int num_threads)
41 struct thread_pool *tpool;
43 if(!(tpool = calloc(1, sizeof *tpool))) {
46 pthread_mutex_init(&tpool->workq_mutex, 0);
47 pthread_cond_init(&tpool->workq_condvar, 0);
48 pthread_cond_init(&tpool->done_condvar, 0);
50 if(num_threads <= 0) {
51 num_threads = tpool_num_processors();
53 tpool->num_threads = num_threads;
55 if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
59 for(i=0; i<num_threads; i++) {
60 if(pthread_create(tpool->threads + i, 0, thread_func, tpool) == -1) {
61 tpool->threads[i] = 0;
69 void tpool_destroy(struct thread_pool *tpool)
75 tpool->should_quit = 1;
77 pthread_cond_broadcast(&tpool->workq_condvar);
80 printf("thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
83 for(i=0; i<tpool->num_threads; i++) {
84 pthread_join(tpool->threads[i], 0);
92 pthread_mutex_destroy(&tpool->workq_mutex);
93 pthread_cond_destroy(&tpool->workq_condvar);
94 pthread_cond_destroy(&tpool->done_condvar);
97 void tpool_begin_batch(struct thread_pool *tpool)
102 void tpool_end_batch(struct thread_pool *tpool)
105 pthread_cond_broadcast(&tpool->workq_condvar);
108 int tpool_enqueue(struct thread_pool *tpool, void *data,
109 tpool_callback work_func, tpool_callback done_func)
111 struct work_item *job;
113 if(!(job = malloc(sizeof *job))) {
116 job->work = work_func;
117 job->done = done_func;
121 pthread_mutex_lock(&tpool->workq_mutex);
123 tpool->workq_tail->next = job;
124 tpool->workq_tail = job;
126 tpool->workq = tpool->workq_tail = job;
129 pthread_mutex_unlock(&tpool->workq_mutex);
131 if(!tpool->in_batch) {
132 pthread_cond_broadcast(&tpool->workq_condvar);
137 void tpool_clear(struct thread_pool *tpool)
139 pthread_mutex_lock(&tpool->workq_mutex);
140 while(tpool->workq) {
141 void *tmp = tpool->workq;
142 tpool->workq = tpool->workq->next;
145 tpool->workq = tpool->workq_tail = 0;
147 pthread_mutex_unlock(&tpool->workq_mutex);
150 int tpool_queued_jobs(struct thread_pool *tpool)
153 pthread_mutex_lock(&tpool->workq_mutex);
155 pthread_mutex_unlock(&tpool->workq_mutex);
159 int tpool_active_jobs(struct thread_pool *tpool)
162 pthread_mutex_lock(&tpool->workq_mutex);
163 res = tpool->nactive;
164 pthread_mutex_unlock(&tpool->workq_mutex);
168 int tpool_pending_jobs(struct thread_pool *tpool)
171 pthread_mutex_lock(&tpool->workq_mutex);
172 res = tpool->qsize + tpool->nactive;
173 pthread_mutex_unlock(&tpool->workq_mutex);
177 void tpool_wait(struct thread_pool *tpool)
179 pthread_mutex_lock(&tpool->workq_mutex);
180 while(tpool->nactive || tpool->qsize) {
181 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
183 pthread_mutex_unlock(&tpool->workq_mutex);
186 void tpool_wait_one(struct thread_pool *tpool)
189 pthread_mutex_lock(&tpool->workq_mutex);
190 cur_pending = tpool->qsize + tpool->nactive;
192 while(tpool->qsize + tpool->nactive >= cur_pending) {
193 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
196 pthread_mutex_unlock(&tpool->workq_mutex);
199 long tpool_timedwait(struct thread_pool *tpool, long timeout)
201 struct timespec tout_ts;
202 struct timeval tv0, tv;
203 gettimeofday(&tv0, 0);
205 long sec = timeout / 1000;
206 tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
207 tout_ts.tv_sec = tv0.tv_sec + sec;
209 pthread_mutex_lock(&tpool->workq_mutex);
210 while(tpool->nactive || tpool->qsize) {
211 if(pthread_cond_timedwait(&tpool->done_condvar,
212 &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
216 pthread_mutex_unlock(&tpool->workq_mutex);
218 gettimeofday(&tv, 0);
219 return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
222 static void *thread_func(void *args)
224 struct thread_pool *tpool = args;
226 pthread_mutex_lock(&tpool->workq_mutex);
227 while(!tpool->should_quit) {
228 pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
230 while(!tpool->should_quit && tpool->workq) {
231 /* grab the first job */
232 struct work_item *job = tpool->workq;
233 tpool->workq = tpool->workq->next;
235 tpool->workq_tail = 0;
238 pthread_mutex_unlock(&tpool->workq_mutex);
241 job->work(job->data);
243 job->done(job->data);
246 pthread_mutex_lock(&tpool->workq_mutex);
247 /* notify everyone interested that we're done with this job */
248 pthread_cond_broadcast(&tpool->done_condvar);
252 pthread_mutex_unlock(&tpool->workq_mutex);
258 /* The following highly platform-specific code detects the number
259 * of processors available in the system. It's used by the thread pool
260 * to autodetect how many threads to spawn.
261 * Currently works on: Linux, BSD, Darwin, and Windows.
264 #if defined(__APPLE__) && defined(__MACH__)
273 #if defined(unix) || defined(__unix__)
277 # include <sys/sysctl.h>
281 #if defined(WIN32) || defined(__WIN32__)
286 int tpool_num_processors(void)
288 #if defined(unix) || defined(__unix__)
289 # if defined(__bsd__)
290 /* BSD systems provide the num.processors through sysctl */
291 int num, mib[] = {CTL_HW, HW_NCPU};
292 size_t len = sizeof num;
294 sysctl(mib, 2, &num, &len, 0, 0);
297 # elif defined(__sgi)
298 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
299 return sysconf(_SC_NPROC_ONLN);
301 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
302 return sysconf(_SC_NPROCESSORS_ONLN);
303 # endif /* bsd/sgi/other */
305 #elif defined(WIN32) || defined(__WIN32__)
306 /* under windows we need to call GetSystemInfo */
308 GetSystemInfo(&info);
309 return info.dwNumberOfProcessors;