4d06570fbb059320439486e95180fe87e4a95b8b
[fbgfx] / src / tpool.c
1 #ifndef NO_THREADS
2 /* worker thread pool based on POSIX threads
3  * author: John Tsiombikas <nuclear@member.fsf.org>
4  * This code is public domain.
5  */
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <errno.h>
9 #include <unistd.h>
10 #include <sys/time.h>
11 #include <pthread.h>
12 #include "tpool.h"
13
14 struct work_item {
15         void *data;
16         tpool_callback work, done;
17         struct work_item *next;
18 };
19
20 struct thread_pool {
21         pthread_t *threads;
22         int num_threads;
23
24         int qsize;
25         struct work_item *workq, *workq_tail;
26         pthread_mutex_t workq_mutex;
27         pthread_cond_t workq_condvar;
28
29         int nactive;    /* number of active workers (not sleeping) */
30
31         pthread_cond_t done_condvar;
32
33         int should_quit;
34         int in_batch;
35 };
36
37 static void *thread_func(void *args);
38
39 struct thread_pool *tpool_create(int num_threads)
40 {
41         int i;
42         struct thread_pool *tpool;
43
44         if(!(tpool = calloc(1, sizeof *tpool))) {
45                 return 0;
46         }
47         pthread_mutex_init(&tpool->workq_mutex, 0);
48         pthread_cond_init(&tpool->workq_condvar, 0);
49         pthread_cond_init(&tpool->done_condvar, 0);
50
51         if(num_threads <= 0) {
52                 num_threads = tpool_num_processors();
53         }
54         tpool->num_threads = num_threads;
55
56         if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
57                 free(tpool);
58                 return 0;
59         }
60         for(i=0; i<num_threads; i++) {
61                 if(pthread_create(tpool->threads + i, 0, thread_func, tpool) == -1) {
62                         tpool->threads[i] = 0;
63                         tpool_destroy(tpool);
64                         return 0;
65                 }
66         }
67         return tpool;
68 }
69
70 void tpool_destroy(struct thread_pool *tpool)
71 {
72         int i;
73         if(!tpool) return;
74
75         tpool_clear(tpool);
76         tpool->should_quit = 1;
77
78         pthread_cond_broadcast(&tpool->workq_condvar);
79
80         if(tpool->threads) {
81                 printf("thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
82                 fflush(stdout);
83
84                 for(i=0; i<tpool->num_threads; i++) {
85                         pthread_join(tpool->threads[i], 0);
86                         putchar('.');
87                         fflush(stdout);
88                 }
89                 putchar('\n');
90                 free(tpool->threads);
91         }
92
93         pthread_mutex_destroy(&tpool->workq_mutex);
94         pthread_cond_destroy(&tpool->workq_condvar);
95         pthread_cond_destroy(&tpool->done_condvar);
96 }
97
98 void tpool_begin_batch(struct thread_pool *tpool)
99 {
100         tpool->in_batch = 1;
101 }
102
103 void tpool_end_batch(struct thread_pool *tpool)
104 {
105         tpool->in_batch = 0;
106         pthread_cond_broadcast(&tpool->workq_condvar);
107 }
108
109 int tpool_enqueue(struct thread_pool *tpool, void *data,
110                 tpool_callback work_func, tpool_callback done_func)
111 {
112         struct work_item *job;
113
114         if(!(job = malloc(sizeof *job))) {
115                 return -1;
116         }
117         job->work = work_func;
118         job->done = done_func;
119         job->data = data;
120         job->next = 0;
121
122         pthread_mutex_lock(&tpool->workq_mutex);
123         if(tpool->workq) {
124                 tpool->workq_tail->next = job;
125                 tpool->workq_tail = job;
126         } else {
127                 tpool->workq = tpool->workq_tail = job;
128         }
129         ++tpool->qsize;
130         pthread_mutex_unlock(&tpool->workq_mutex);
131
132         if(!tpool->in_batch) {
133                 pthread_cond_broadcast(&tpool->workq_condvar);
134         }
135         return 0;
136 }
137
138 void tpool_clear(struct thread_pool *tpool)
139 {
140         pthread_mutex_lock(&tpool->workq_mutex);
141         while(tpool->workq) {
142                 void *tmp = tpool->workq;
143                 tpool->workq = tpool->workq->next;
144                 free(tmp);
145         }
146         tpool->workq = tpool->workq_tail = 0;
147         tpool->qsize = 0;
148         pthread_mutex_unlock(&tpool->workq_mutex);
149 }
150
151 int tpool_queued_jobs(struct thread_pool *tpool)
152 {
153         int res;
154         pthread_mutex_lock(&tpool->workq_mutex);
155         res = tpool->qsize;
156         pthread_mutex_unlock(&tpool->workq_mutex);
157         return res;
158 }
159
160 int tpool_active_jobs(struct thread_pool *tpool)
161 {
162         int res;
163         pthread_mutex_lock(&tpool->workq_mutex);
164         res = tpool->nactive;
165         pthread_mutex_unlock(&tpool->workq_mutex);
166         return res;
167 }
168
169 int tpool_pending_jobs(struct thread_pool *tpool)
170 {
171         int res;
172         pthread_mutex_lock(&tpool->workq_mutex);
173         res = tpool->qsize + tpool->nactive;
174         pthread_mutex_unlock(&tpool->workq_mutex);
175         return res;
176 }
177
178 void tpool_wait(struct thread_pool *tpool)
179 {
180         pthread_mutex_lock(&tpool->workq_mutex);
181         while(tpool->nactive || tpool->qsize) {
182                 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
183         }
184         pthread_mutex_unlock(&tpool->workq_mutex);
185 }
186
187 void tpool_wait_one(struct thread_pool *tpool)
188 {
189         int cur_pending;
190         pthread_mutex_lock(&tpool->workq_mutex);
191         cur_pending = tpool->qsize + tpool->nactive;
192         if(cur_pending) {
193                 while(tpool->qsize + tpool->nactive >= cur_pending) {
194                         pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
195                 }
196         }
197         pthread_mutex_unlock(&tpool->workq_mutex);
198 }
199
200 long tpool_timedwait(struct thread_pool *tpool, long timeout)
201 {
202         struct timespec tout_ts;
203         struct timeval tv0, tv;
204         long sec;
205
206         gettimeofday(&tv0, 0);
207
208         sec = timeout / 1000;
209         tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
210         tout_ts.tv_sec = tv0.tv_sec + sec;
211
212         pthread_mutex_lock(&tpool->workq_mutex);
213         while(tpool->nactive || tpool->qsize) {
214                 if(pthread_cond_timedwait(&tpool->done_condvar,
215                                         &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
216                         break;
217                 }
218         }
219         pthread_mutex_unlock(&tpool->workq_mutex);
220
221         gettimeofday(&tv, 0);
222         return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
223 }
224
225 static void *thread_func(void *args)
226 {
227         struct thread_pool *tpool = args;
228
229         pthread_mutex_lock(&tpool->workq_mutex);
230         while(!tpool->should_quit) {
231                 pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
232
233                 while(!tpool->should_quit && tpool->workq) {
234                         /* grab the first job */
235                         struct work_item *job = tpool->workq;
236                         tpool->workq = tpool->workq->next;
237                         if(!tpool->workq)
238                                 tpool->workq_tail = 0;
239                         ++tpool->nactive;
240                         --tpool->qsize;
241                         pthread_mutex_unlock(&tpool->workq_mutex);
242
243                         /* do the job */
244                         job->work(job->data);
245                         if(job->done) {
246                                 job->done(job->data);
247                         }
248
249                         pthread_mutex_lock(&tpool->workq_mutex);
250                         /* notify everyone interested that we're done with this job */
251                         pthread_cond_broadcast(&tpool->done_condvar);
252                         --tpool->nactive;
253                 }
254         }
255         pthread_mutex_unlock(&tpool->workq_mutex);
256
257         return 0;
258 }
259
260
261 /* The following highly platform-specific code detects the number
262  * of processors available in the system. It's used by the thread pool
263  * to autodetect how many threads to spawn.
264  * Currently works on: Linux, BSD, Darwin, and Windows.
265  */
266
267 #if defined(__APPLE__) && defined(__MACH__)
268 # ifndef __unix__
269 #  define __unix__      1
270 # endif /* unix */
271 # ifndef __bsd__
272 #  define __bsd__       1
273 # endif /* bsd */
274 #endif  /* apple */
275
276 #if defined(unix) || defined(__unix__)
277 #include <unistd.h>
278
279 # ifdef __bsd__
280 #  include <sys/sysctl.h>
281 # endif
282 #endif
283
284 #if defined(WIN32) || defined(__WIN32__)
285 #include <windows.h>
286 #endif
287
288
289 int tpool_num_processors(void)
290 {
291 #if defined(unix) || defined(__unix__)
292 # if defined(__bsd__)
293         /* BSD systems provide the num.processors through sysctl */
294         int num, mib[] = {CTL_HW, HW_NCPU};
295         size_t len = sizeof num;
296
297         sysctl(mib, 2, &num, &len, 0, 0);
298         return num;
299
300 # elif defined(__sgi)
301         /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
302         return sysconf(_SC_NPROC_ONLN);
303 # else
304         /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
305         return sysconf(_SC_NPROCESSORS_ONLN);
306 # endif /* bsd/sgi/other */
307
308 #elif defined(WIN32) || defined(__WIN32__)
309         /* under windows we need to call GetSystemInfo */
310         SYSTEM_INFO info;
311         GetSystemInfo(&info);
312         return info.dwNumberOfProcessors;
313 #endif
314 }
315
316 #endif  /* !def NO_THREADS */