done
[fbgfx] / src / tpool.c
1 /* worker thread pool based on POSIX threads
2  * author: John Tsiombikas <nuclear@member.fsf.org>
3  * This code is public domain.
4  */
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <errno.h>
8 #include <unistd.h>
9 #include <sys/time.h>
10 #include <pthread.h>
11 #include "tpool.h"
12
13 struct work_item {
14         void *data;
15         tpool_callback work, done;
16         struct work_item *next;
17 };
18
19 struct thread_pool {
20         pthread_t *threads;
21         int num_threads;
22
23         int qsize;
24         struct work_item *workq, *workq_tail;
25         pthread_mutex_t workq_mutex;
26         pthread_cond_t workq_condvar;
27
28         int nactive;    /* number of active workers (not sleeping) */
29
30         pthread_cond_t done_condvar;
31
32         int should_quit;
33         int in_batch;
34 };
35
36 static void *thread_func(void *args);
37
38 struct thread_pool *tpool_create(int num_threads)
39 {
40         int i;
41         struct thread_pool *tpool;
42
43         if(!(tpool = calloc(1, sizeof *tpool))) {
44                 return 0;
45         }
46         pthread_mutex_init(&tpool->workq_mutex, 0);
47         pthread_cond_init(&tpool->workq_condvar, 0);
48         pthread_cond_init(&tpool->done_condvar, 0);
49
50         if(num_threads <= 0) {
51                 num_threads = tpool_num_processors();
52         }
53         tpool->num_threads = num_threads;
54
55         if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
56                 free(tpool);
57                 return 0;
58         }
59         for(i=0; i<num_threads; i++) {
60                 if(pthread_create(tpool->threads + i, 0, thread_func, tpool) == -1) {
61                         tpool->threads[i] = 0;
62                         tpool_destroy(tpool);
63                         return 0;
64                 }
65         }
66         return tpool;
67 }
68
69 void tpool_destroy(struct thread_pool *tpool)
70 {
71         int i;
72         if(!tpool) return;
73
74         tpool_clear(tpool);
75         tpool->should_quit = 1;
76
77         pthread_cond_broadcast(&tpool->workq_condvar);
78
79         if(tpool->threads) {
80                 printf("thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
81                 fflush(stdout);
82
83                 for(i=0; i<tpool->num_threads; i++) {
84                         pthread_join(tpool->threads[i], 0);
85                         putchar('.');
86                         fflush(stdout);
87                 }
88                 putchar('\n');
89                 free(tpool->threads);
90         }
91
92         pthread_mutex_destroy(&tpool->workq_mutex);
93         pthread_cond_destroy(&tpool->workq_condvar);
94         pthread_cond_destroy(&tpool->done_condvar);
95 }
96
97 void tpool_begin_batch(struct thread_pool *tpool)
98 {
99         tpool->in_batch = 1;
100 }
101
102 void tpool_end_batch(struct thread_pool *tpool)
103 {
104         tpool->in_batch = 0;
105         pthread_cond_broadcast(&tpool->workq_condvar);
106 }
107
108 int tpool_enqueue(struct thread_pool *tpool, void *data,
109                 tpool_callback work_func, tpool_callback done_func)
110 {
111         struct work_item *job;
112
113         if(!(job = malloc(sizeof *job))) {
114                 return -1;
115         }
116         job->work = work_func;
117         job->done = done_func;
118         job->data = data;
119         job->next = 0;
120
121         pthread_mutex_lock(&tpool->workq_mutex);
122         if(tpool->workq) {
123                 tpool->workq_tail->next = job;
124                 tpool->workq_tail = job;
125         } else {
126                 tpool->workq = tpool->workq_tail = job;
127         }
128         ++tpool->qsize;
129         pthread_mutex_unlock(&tpool->workq_mutex);
130
131         if(!tpool->in_batch) {
132                 pthread_cond_broadcast(&tpool->workq_condvar);
133         }
134         return 0;
135 }
136
137 void tpool_clear(struct thread_pool *tpool)
138 {
139         pthread_mutex_lock(&tpool->workq_mutex);
140         while(tpool->workq) {
141                 void *tmp = tpool->workq;
142                 tpool->workq = tpool->workq->next;
143                 free(tmp);
144         }
145         tpool->workq = tpool->workq_tail = 0;
146         tpool->qsize = 0;
147         pthread_mutex_unlock(&tpool->workq_mutex);
148 }
149
150 int tpool_queued_jobs(struct thread_pool *tpool)
151 {
152         int res;
153         pthread_mutex_lock(&tpool->workq_mutex);
154         res = tpool->qsize;
155         pthread_mutex_unlock(&tpool->workq_mutex);
156         return res;
157 }
158
159 int tpool_active_jobs(struct thread_pool *tpool)
160 {
161         int res;
162         pthread_mutex_lock(&tpool->workq_mutex);
163         res = tpool->nactive;
164         pthread_mutex_unlock(&tpool->workq_mutex);
165         return res;
166 }
167
168 int tpool_pending_jobs(struct thread_pool *tpool)
169 {
170         int res;
171         pthread_mutex_lock(&tpool->workq_mutex);
172         res = tpool->qsize + tpool->nactive;
173         pthread_mutex_unlock(&tpool->workq_mutex);
174         return res;
175 }
176
177 void tpool_wait(struct thread_pool *tpool)
178 {
179         pthread_mutex_lock(&tpool->workq_mutex);
180         while(tpool->nactive || tpool->qsize) {
181                 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
182         }
183         pthread_mutex_unlock(&tpool->workq_mutex);
184 }
185
186 void tpool_wait_one(struct thread_pool *tpool)
187 {
188         int cur_pending;
189         pthread_mutex_lock(&tpool->workq_mutex);
190         cur_pending = tpool->qsize + tpool->nactive;
191         if(cur_pending) {
192                 while(tpool->qsize + tpool->nactive >= cur_pending) {
193                         pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
194                 }
195         }
196         pthread_mutex_unlock(&tpool->workq_mutex);
197 }
198
199 long tpool_timedwait(struct thread_pool *tpool, long timeout)
200 {
201         struct timespec tout_ts;
202         struct timeval tv0, tv;
203         gettimeofday(&tv0, 0);
204
205         long sec = timeout / 1000;
206         tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
207         tout_ts.tv_sec = tv0.tv_sec + sec;
208
209         pthread_mutex_lock(&tpool->workq_mutex);
210         while(tpool->nactive || tpool->qsize) {
211                 if(pthread_cond_timedwait(&tpool->done_condvar,
212                                         &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
213                         break;
214                 }
215         }
216         pthread_mutex_unlock(&tpool->workq_mutex);
217
218         gettimeofday(&tv, 0);
219         return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
220 }
221
222 static void *thread_func(void *args)
223 {
224         struct thread_pool *tpool = args;
225
226         pthread_mutex_lock(&tpool->workq_mutex);
227         while(!tpool->should_quit) {
228                 pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
229
230                 while(!tpool->should_quit && tpool->workq) {
231                         /* grab the first job */
232                         struct work_item *job = tpool->workq;
233                         tpool->workq = tpool->workq->next;
234                         if(!tpool->workq)
235                                 tpool->workq_tail = 0;
236                         ++tpool->nactive;
237                         --tpool->qsize;
238                         pthread_mutex_unlock(&tpool->workq_mutex);
239
240                         /* do the job */
241                         job->work(job->data);
242                         if(job->done) {
243                                 job->done(job->data);
244                         }
245
246                         pthread_mutex_lock(&tpool->workq_mutex);
247                         /* notify everyone interested that we're done with this job */
248                         pthread_cond_broadcast(&tpool->done_condvar);
249                         --tpool->nactive;
250                 }
251         }
252         pthread_mutex_unlock(&tpool->workq_mutex);
253
254         return 0;
255 }
256
257
258 /* The following highly platform-specific code detects the number
259  * of processors available in the system. It's used by the thread pool
260  * to autodetect how many threads to spawn.
261  * Currently works on: Linux, BSD, Darwin, and Windows.
262  */
263
264 #if defined(__APPLE__) && defined(__MACH__)
265 # ifndef __unix__
266 #  define __unix__      1
267 # endif /* unix */
268 # ifndef __bsd__
269 #  define __bsd__       1
270 # endif /* bsd */
271 #endif  /* apple */
272
273 #if defined(unix) || defined(__unix__)
274 #include <unistd.h>
275
276 # ifdef __bsd__
277 #  include <sys/sysctl.h>
278 # endif
279 #endif
280
281 #if defined(WIN32) || defined(__WIN32__)
282 #include <windows.h>
283 #endif
284
285
286 int tpool_num_processors(void)
287 {
288 #if defined(unix) || defined(__unix__)
289 # if defined(__bsd__)
290         /* BSD systems provide the num.processors through sysctl */
291         int num, mib[] = {CTL_HW, HW_NCPU};
292         size_t len = sizeof num;
293
294         sysctl(mib, 2, &num, &len, 0, 0);
295         return num;
296
297 # elif defined(__sgi)
298         /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
299         return sysconf(_SC_NPROC_ONLN);
300 # else
301         /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
302         return sysconf(_SC_NPROCESSORS_ONLN);
303 # endif /* bsd/sgi/other */
304
305 #elif defined(WIN32) || defined(__WIN32__)
306         /* under windows we need to call GetSystemInfo */
307         SYSTEM_INFO info;
308         GetSystemInfo(&info);
309         return info.dwNumberOfProcessors;
310 #endif
311 }