9e73f44c790e4ba3dd48f7f9a8f828b651a2b048
[fbgfx] / src / tpool.c
1 /* worker thread pool based on POSIX threads
2  * author: John Tsiombikas <nuclear@member.fsf.org>
3  * This code is public domain.
4  */
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <errno.h>
8 #include <unistd.h>
9 #include <sys/time.h>
10 #include <pthread.h>
11 #include "tpool.h"
12
13 struct work_item {
14         void *data;
15         tpool_callback work, done;
16         struct work_item *next;
17 };
18
19 struct thread_pool {
20         pthread_t *threads;
21         int num_threads;
22
23         int qsize;
24         struct work_item *workq, *workq_tail;
25         pthread_mutex_t workq_mutex;
26         pthread_cond_t workq_condvar;
27
28         int nactive;    /* number of active workers (not sleeping) */
29
30         pthread_cond_t done_condvar;
31
32         int should_quit;
33         int in_batch;
34 };
35
36 static void *thread_func(void *args);
37
38 struct thread_pool *tpool_create(int num_threads)
39 {
40         int i;
41         struct thread_pool *tpool;
42
43         if(!(tpool = calloc(1, sizeof *tpool))) {
44                 return 0;
45         }
46         pthread_mutex_init(&tpool->workq_mutex, 0);
47         pthread_cond_init(&tpool->workq_condvar, 0);
48         pthread_cond_init(&tpool->done_condvar, 0);
49
50         if(num_threads <= 0) {
51                 num_threads = tpool_num_processors();
52         }
53         tpool->num_threads = num_threads;
54
55         if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
56                 free(tpool);
57                 return 0;
58         }
59         for(i=0; i<num_threads; i++) {
60                 if(pthread_create(tpool->threads + i, 0, thread_func, tpool) == -1) {
61                         tpool->threads[i] = 0;
62                         tpool_destroy(tpool);
63                         return 0;
64                 }
65         }
66         return tpool;
67 }
68
69 void tpool_destroy(struct thread_pool *tpool)
70 {
71         int i;
72         if(!tpool) return;
73
74         tpool_clear(tpool);
75         tpool->should_quit = 1;
76
77         pthread_cond_broadcast(&tpool->workq_condvar);
78
79         if(tpool->threads) {
80                 printf("thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
81                 fflush(stdout);
82
83                 for(i=0; i<tpool->num_threads; i++) {
84                         pthread_join(tpool->threads[i], 0);
85                         putchar('.');
86                         fflush(stdout);
87                 }
88                 putchar('\n');
89                 free(tpool->threads);
90         }
91
92         pthread_mutex_destroy(&tpool->workq_mutex);
93         pthread_cond_destroy(&tpool->workq_condvar);
94         pthread_cond_destroy(&tpool->done_condvar);
95 }
96
97 void tpool_begin_batch(struct thread_pool *tpool)
98 {
99         tpool->in_batch = 1;
100 }
101
102 void tpool_end_batch(struct thread_pool *tpool)
103 {
104         tpool->in_batch = 0;
105         pthread_cond_broadcast(&tpool->workq_condvar);
106 }
107
108 int tpool_enqueue(struct thread_pool *tpool, void *data,
109                 tpool_callback work_func, tpool_callback done_func)
110 {
111         struct work_item *job;
112
113         if(!(job = malloc(sizeof *job))) {
114                 return -1;
115         }
116         job->work = work_func;
117         job->done = done_func;
118         job->data = data;
119         job->next = 0;
120
121         pthread_mutex_lock(&tpool->workq_mutex);
122         if(tpool->workq) {
123                 tpool->workq_tail->next = job;
124                 tpool->workq_tail = job;
125         } else {
126                 tpool->workq = tpool->workq_tail = job;
127         }
128         ++tpool->qsize;
129         pthread_mutex_unlock(&tpool->workq_mutex);
130
131         if(!tpool->in_batch) {
132                 pthread_cond_broadcast(&tpool->workq_condvar);
133         }
134         return 0;
135 }
136
137 void tpool_clear(struct thread_pool *tpool)
138 {
139         pthread_mutex_lock(&tpool->workq_mutex);
140         while(tpool->workq) {
141                 void *tmp = tpool->workq;
142                 tpool->workq = tpool->workq->next;
143                 free(tmp);
144         }
145         tpool->workq = tpool->workq_tail = 0;
146         tpool->qsize = 0;
147         pthread_mutex_unlock(&tpool->workq_mutex);
148 }
149
150 int tpool_queued_jobs(struct thread_pool *tpool)
151 {
152         int res;
153         pthread_mutex_lock(&tpool->workq_mutex);
154         res = tpool->qsize;
155         pthread_mutex_unlock(&tpool->workq_mutex);
156         return res;
157 }
158
159 int tpool_active_jobs(struct thread_pool *tpool)
160 {
161         int res;
162         pthread_mutex_lock(&tpool->workq_mutex);
163         res = tpool->nactive;
164         pthread_mutex_unlock(&tpool->workq_mutex);
165         return res;
166 }
167
168 int tpool_pending_jobs(struct thread_pool *tpool)
169 {
170         int res;
171         pthread_mutex_lock(&tpool->workq_mutex);
172         res = tpool->qsize + tpool->nactive;
173         pthread_mutex_unlock(&tpool->workq_mutex);
174         return res;
175 }
176
177 void tpool_wait(struct thread_pool *tpool)
178 {
179         pthread_mutex_lock(&tpool->workq_mutex);
180         while(tpool->nactive || tpool->qsize) {
181                 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
182         }
183         pthread_mutex_unlock(&tpool->workq_mutex);
184 }
185
186 void tpool_wait_one(struct thread_pool *tpool)
187 {
188         int cur_pending;
189         pthread_mutex_lock(&tpool->workq_mutex);
190         cur_pending = tpool->qsize + tpool->nactive;
191         if(cur_pending) {
192                 while(tpool->qsize + tpool->nactive >= cur_pending) {
193                         pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
194                 }
195         }
196         pthread_mutex_unlock(&tpool->workq_mutex);
197 }
198
199 long tpool_timedwait(struct thread_pool *tpool, long timeout)
200 {
201         struct timespec tout_ts;
202         struct timeval tv0, tv;
203         long sec;
204
205         gettimeofday(&tv0, 0);
206
207         sec = timeout / 1000;
208         tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
209         tout_ts.tv_sec = tv0.tv_sec + sec;
210
211         pthread_mutex_lock(&tpool->workq_mutex);
212         while(tpool->nactive || tpool->qsize) {
213                 if(pthread_cond_timedwait(&tpool->done_condvar,
214                                         &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
215                         break;
216                 }
217         }
218         pthread_mutex_unlock(&tpool->workq_mutex);
219
220         gettimeofday(&tv, 0);
221         return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
222 }
223
224 static void *thread_func(void *args)
225 {
226         struct thread_pool *tpool = args;
227
228         pthread_mutex_lock(&tpool->workq_mutex);
229         while(!tpool->should_quit) {
230                 pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
231
232                 while(!tpool->should_quit && tpool->workq) {
233                         /* grab the first job */
234                         struct work_item *job = tpool->workq;
235                         tpool->workq = tpool->workq->next;
236                         if(!tpool->workq)
237                                 tpool->workq_tail = 0;
238                         ++tpool->nactive;
239                         --tpool->qsize;
240                         pthread_mutex_unlock(&tpool->workq_mutex);
241
242                         /* do the job */
243                         job->work(job->data);
244                         if(job->done) {
245                                 job->done(job->data);
246                         }
247
248                         pthread_mutex_lock(&tpool->workq_mutex);
249                         /* notify everyone interested that we're done with this job */
250                         pthread_cond_broadcast(&tpool->done_condvar);
251                         --tpool->nactive;
252                 }
253         }
254         pthread_mutex_unlock(&tpool->workq_mutex);
255
256         return 0;
257 }
258
259
260 /* The following highly platform-specific code detects the number
261  * of processors available in the system. It's used by the thread pool
262  * to autodetect how many threads to spawn.
263  * Currently works on: Linux, BSD, Darwin, and Windows.
264  */
265
266 #if defined(__APPLE__) && defined(__MACH__)
267 # ifndef __unix__
268 #  define __unix__      1
269 # endif /* unix */
270 # ifndef __bsd__
271 #  define __bsd__       1
272 # endif /* bsd */
273 #endif  /* apple */
274
275 #if defined(unix) || defined(__unix__)
276 #include <unistd.h>
277
278 # ifdef __bsd__
279 #  include <sys/sysctl.h>
280 # endif
281 #endif
282
283 #if defined(WIN32) || defined(__WIN32__)
284 #include <windows.h>
285 #endif
286
287
288 int tpool_num_processors(void)
289 {
290 #if defined(unix) || defined(__unix__)
291 # if defined(__bsd__)
292         /* BSD systems provide the num.processors through sysctl */
293         int num, mib[] = {CTL_HW, HW_NCPU};
294         size_t len = sizeof num;
295
296         sysctl(mib, 2, &num, &len, 0, 0);
297         return num;
298
299 # elif defined(__sgi)
300         /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
301         return sysconf(_SC_NPROC_ONLN);
302 # else
303         /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
304         return sysconf(_SC_NPROCESSORS_ONLN);
305 # endif /* bsd/sgi/other */
306
307 #elif defined(WIN32) || defined(__WIN32__)
308         /* under windows we need to call GetSystemInfo */
309         SYSTEM_INFO info;
310         GetSystemInfo(&info);
311         return info.dwNumberOfProcessors;
312 #endif
313 }