mod_url: make cache directory if it doesn't exist (tested on UNIX only)
[assman] / src / tpool.c
1 /* worker thread pool based on POSIX threads
2  * author: John Tsiombikas <nuclear@member.fsf.org>
3  * This code is public domain.
4  */
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <errno.h>
8 #include <pthread.h>
9 #include "tpool.h"
10
11 #if defined(__APPLE__) && defined(__MACH__)
12 # ifndef __unix__
13 #  define __unix__      1
14 # endif /* unix */
15 # ifndef __bsd__
16 #  define __bsd__       1
17 # endif /* bsd */
18 #endif  /* apple */
19
20 #if defined(unix) || defined(__unix__)
21 #include <unistd.h>
22 #include <sys/time.h>
23
24 # ifdef __bsd__
25 #  include <sys/sysctl.h>
26 # endif
27 #endif
28
29 #if defined(WIN32) || defined(__WIN32__)
30 #include <windows.h>
31 #endif
32
33
34 struct work_item {
35         void *data;
36         tpool_callback work, done;
37         struct work_item *next;
38 };
39
40 struct thread_data {
41         int id;
42         struct thread_pool *pool;
43 };
44
45 struct thread_pool {
46         pthread_t *threads;
47         struct thread_data *tdata;
48         int num_threads;
49         pthread_key_t idkey;
50
51         int qsize;
52         struct work_item *workq, *workq_tail;
53         pthread_mutex_t workq_mutex;
54         pthread_cond_t workq_condvar;
55
56         int nactive;    /* number of active workers (not sleeping) */
57
58         pthread_cond_t done_condvar;
59
60         int should_quit;
61         int in_batch;
62
63         int nref;       /* reference count */
64
65 #if defined(WIN32) || defined(__WIN32__)
66         HANDLE wait_event;
67 #else
68         int wait_pipe[2];
69 #endif
70 };
71
72 static void *thread_func(void *args);
73 static void send_done_event(struct thread_pool *tpool);
74
75 static struct work_item *alloc_work_item(void);
76 static void free_work_item(struct work_item *w);
77
78
79 struct thread_pool *ass_tpool_create(int num_threads)
80 {
81         int i;
82         struct thread_pool *tpool;
83
84         if(!(tpool = calloc(1, sizeof *tpool))) {
85                 return 0;
86         }
87         pthread_mutex_init(&tpool->workq_mutex, 0);
88         pthread_cond_init(&tpool->workq_condvar, 0);
89         pthread_cond_init(&tpool->done_condvar, 0);
90         pthread_key_create(&tpool->idkey, 0);
91
92         pthread_setspecific(tpool->idkey, (void*)0xffffffff);
93
94 #if !defined(WIN32) && !defined(__WIN32__)
95         tpool->wait_pipe[0] = tpool->wait_pipe[1] = -1;
96 #endif
97
98         if(num_threads <= 0) {
99                 num_threads = ass_tpool_num_processors();
100         }
101         tpool->num_threads = num_threads;
102
103         if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
104                 free(tpool);
105                 return 0;
106         }
107         if(!(tpool->tdata = malloc(num_threads * sizeof *tpool->tdata))) {
108                 free(tpool->threads);
109                 free(tpool);
110                 return 0;
111         }
112
113         for(i=0; i<num_threads; i++) {
114                 tpool->tdata[i].id = i;
115                 tpool->tdata[i].pool = tpool;
116
117                 if(pthread_create(tpool->threads + i, 0, thread_func, tpool->tdata + i) == -1) {
118                         /*tpool->threads[i] = 0;*/
119                         ass_tpool_destroy(tpool);
120                         return 0;
121                 }
122         }
123         return tpool;
124 }
125
126 void ass_tpool_destroy(struct thread_pool *tpool)
127 {
128         int i;
129         if(!tpool) return;
130
131         ass_tpool_clear(tpool);
132         tpool->should_quit = 1;
133
134         pthread_cond_broadcast(&tpool->workq_condvar);
135
136         if(tpool->threads) {
137                 printf("thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
138                 fflush(stdout);
139
140                 for(i=0; i<tpool->num_threads; i++) {
141                         pthread_join(tpool->threads[i], 0);
142                         putchar('.');
143                         fflush(stdout);
144                 }
145                 putchar('\n');
146                 free(tpool->threads);
147         }
148         free(tpool->tdata);
149
150         /* also wake up anyone waiting on the wait* calls */
151         tpool->nactive = 0;
152         pthread_cond_broadcast(&tpool->done_condvar);
153         send_done_event(tpool);
154
155         pthread_mutex_destroy(&tpool->workq_mutex);
156         pthread_cond_destroy(&tpool->workq_condvar);
157         pthread_cond_destroy(&tpool->done_condvar);
158         pthread_key_delete(tpool->idkey);
159
160 #if defined(WIN32) || defined(__WIN32__)
161         if(tpool->wait_event) {
162                 CloseHandle(tpool->wait_event);
163         }
164 #else
165         if(tpool->wait_pipe[0] >= 0) {
166                 close(tpool->wait_pipe[0]);
167                 close(tpool->wait_pipe[1]);
168         }
169 #endif
170 }
171
172 int ass_tpool_addref(struct thread_pool *tpool)
173 {
174         return ++tpool->nref;
175 }
176
177 int ass_tpool_release(struct thread_pool *tpool)
178 {
179         if(--tpool->nref <= 0) {
180                 ass_tpool_destroy(tpool);
181                 return 0;
182         }
183         return tpool->nref;
184 }
185
186 void ass_tpool_begin_batch(struct thread_pool *tpool)
187 {
188         tpool->in_batch = 1;
189 }
190
191 void ass_tpool_end_batch(struct thread_pool *tpool)
192 {
193         tpool->in_batch = 0;
194         pthread_cond_broadcast(&tpool->workq_condvar);
195 }
196
197 int ass_tpool_enqueue(struct thread_pool *tpool, void *data,
198                 tpool_callback work_func, tpool_callback done_func)
199 {
200         struct work_item *job;
201
202         if(!(job = alloc_work_item())) {
203                 return -1;
204         }
205         job->work = work_func;
206         job->done = done_func;
207         job->data = data;
208         job->next = 0;
209
210         pthread_mutex_lock(&tpool->workq_mutex);
211         if(tpool->workq) {
212                 tpool->workq_tail->next = job;
213                 tpool->workq_tail = job;
214         } else {
215                 tpool->workq = tpool->workq_tail = job;
216         }
217         ++tpool->qsize;
218         pthread_mutex_unlock(&tpool->workq_mutex);
219
220         if(!tpool->in_batch) {
221                 pthread_cond_broadcast(&tpool->workq_condvar);
222         }
223         return 0;
224 }
225
226 void ass_tpool_clear(struct thread_pool *tpool)
227 {
228         pthread_mutex_lock(&tpool->workq_mutex);
229         while(tpool->workq) {
230                 void *tmp = tpool->workq;
231                 tpool->workq = tpool->workq->next;
232                 free(tmp);
233         }
234         tpool->workq = tpool->workq_tail = 0;
235         tpool->qsize = 0;
236         pthread_mutex_unlock(&tpool->workq_mutex);
237 }
238
239 int ass_tpool_queued_jobs(struct thread_pool *tpool)
240 {
241         int res;
242         pthread_mutex_lock(&tpool->workq_mutex);
243         res = tpool->qsize;
244         pthread_mutex_unlock(&tpool->workq_mutex);
245         return res;
246 }
247
248 int ass_tpool_active_jobs(struct thread_pool *tpool)
249 {
250         int res;
251         pthread_mutex_lock(&tpool->workq_mutex);
252         res = tpool->nactive;
253         pthread_mutex_unlock(&tpool->workq_mutex);
254         return res;
255 }
256
257 int ass_tpool_pending_jobs(struct thread_pool *tpool)
258 {
259         int res;
260         pthread_mutex_lock(&tpool->workq_mutex);
261         res = tpool->qsize + tpool->nactive;
262         pthread_mutex_unlock(&tpool->workq_mutex);
263         return res;
264 }
265
266 void ass_tpool_wait(struct thread_pool *tpool)
267 {
268         pthread_mutex_lock(&tpool->workq_mutex);
269         while(tpool->nactive || tpool->qsize) {
270                 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
271         }
272         pthread_mutex_unlock(&tpool->workq_mutex);
273 }
274
275 void ass_tpool_wait_pending(struct thread_pool *tpool, int pending_target)
276 {
277         pthread_mutex_lock(&tpool->workq_mutex);
278         while(tpool->qsize + tpool->nactive > pending_target) {
279                 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
280         }
281         pthread_mutex_unlock(&tpool->workq_mutex);
282 }
283
284 #if defined(WIN32) || defined(__WIN32__)
285 long ass_tpool_timedwait(struct thread_pool *tpool, long timeout)
286 {
287         fprintf(stderr, "tpool_timedwait currently unimplemented on windows\n");
288         abort();
289         return 0;
290 }
291
292 /* TODO: actually does this work with MinGW ? */
293 int ass_tpool_get_wait_fd(struct thread_pool *tpool)
294 {
295         static int once;
296         if(!once) {
297                 once = 1;
298                 fprintf(stderr, "warning: ass_tpool_get_wait_fd call on Windows does nothing\n");
299         }
300         return 0;
301 }
302
303 void *ass_tpool_get_wait_handle(struct thread_pool *tpool)
304 {
305         if(!tpool->wait_event) {
306                 if(!(tpool->wait_event = CreateEvent(0, 0, 0, 0))) {
307                         return 0;
308                 }
309         }
310         return tpool->wait_event;
311 }
312
313 static void send_done_event(struct thread_pool *tpool)
314 {
315         if(tpool->wait_event) {
316                 SetEvent(tpool->wait_event);
317         }
318 }
319
320 #else           /* UNIX */
321
322 long ass_tpool_timedwait(struct thread_pool *tpool, long timeout)
323 {
324         struct timespec tout_ts;
325         struct timeval tv0, tv;
326         gettimeofday(&tv0, 0);
327
328         long sec = timeout / 1000;
329         tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
330         tout_ts.tv_sec = tv0.tv_sec + sec;
331
332         pthread_mutex_lock(&tpool->workq_mutex);
333         while(tpool->nactive || tpool->qsize) {
334                 if(pthread_cond_timedwait(&tpool->done_condvar,
335                                         &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
336                         break;
337                 }
338         }
339         pthread_mutex_unlock(&tpool->workq_mutex);
340
341         gettimeofday(&tv, 0);
342         return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
343 }
344
345 int ass_tpool_get_wait_fd(struct thread_pool *tpool)
346 {
347         if(tpool->wait_pipe[0] < 0) {
348                 if(pipe(tpool->wait_pipe) == -1) {
349                         return -1;
350                 }
351         }
352         return tpool->wait_pipe[0];
353 }
354
355 void *ass_tpool_get_wait_handle(struct thread_pool *tpool)
356 {
357         static int once;
358         if(!once) {
359                 once = 1;
360                 fprintf(stderr, "warning: ass_tpool_get_wait_handle call on UNIX does nothing\n");
361         }
362         return 0;
363 }
364
365 static void send_done_event(struct thread_pool *tpool)
366 {
367         if(tpool->wait_pipe[1] >= 0) {
368                 write(tpool->wait_pipe[1], tpool, 1);
369         }
370 }
371 #endif  /* WIN32/UNIX */
372
373 static void *thread_func(void *args)
374 {
375         struct thread_data *tdata = args;
376         struct thread_pool *tpool = tdata->pool;
377
378         pthread_setspecific(tpool->idkey, (void*)(intptr_t)tdata->id);
379
380         pthread_mutex_lock(&tpool->workq_mutex);
381         while(!tpool->should_quit) {
382                 if(!tpool->workq) {
383                         pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
384                         if(tpool->should_quit) break;
385                 }
386
387                 while(!tpool->should_quit && tpool->workq) {
388                         /* grab the first job */
389                         struct work_item *job = tpool->workq;
390                         tpool->workq = tpool->workq->next;
391                         if(!tpool->workq)
392                                 tpool->workq_tail = 0;
393                         ++tpool->nactive;
394                         --tpool->qsize;
395                         pthread_mutex_unlock(&tpool->workq_mutex);
396
397                         /* do the job */
398                         job->work(job->data);
399                         if(job->done) {
400                                 job->done(job->data);
401                         }
402                         free_work_item(job);
403
404                         pthread_mutex_lock(&tpool->workq_mutex);
405                         /* notify everyone interested that we're done with this job */
406                         pthread_cond_broadcast(&tpool->done_condvar);
407                         send_done_event(tpool);
408                         --tpool->nactive;
409                 }
410         }
411         pthread_mutex_unlock(&tpool->workq_mutex);
412
413         return 0;
414 }
415
416
417 int ass_tpool_thread_id(struct thread_pool *tpool)
418 {
419         int id = (intptr_t)pthread_getspecific(tpool->idkey);
420         if(id >= tpool->num_threads) {
421                 return -1;
422         }
423         return id;
424 }
425
426
427 /* The following highly platform-specific code detects the number
428  * of processors available in the system. It's used by the thread pool
429  * to autodetect how many threads to spawn.
430  * Currently works on: Linux, BSD, Darwin, and Windows.
431  */
432 int ass_tpool_num_processors(void)
433 {
434 #if defined(unix) || defined(__unix__)
435 # if defined(__bsd__)
436         /* BSD systems provide the num.processors through sysctl */
437         int num, mib[] = {CTL_HW, HW_NCPU};
438         size_t len = sizeof num;
439
440         sysctl(mib, 2, &num, &len, 0, 0);
441         return num;
442
443 # elif defined(__sgi)
444         /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
445         return sysconf(_SC_NPROC_ONLN);
446 # else
447         /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
448         return sysconf(_SC_NPROCESSORS_ONLN);
449 # endif /* bsd/sgi/other */
450
451 #elif defined(WIN32) || defined(__WIN32__)
452         /* under windows we need to call GetSystemInfo */
453         SYSTEM_INFO info;
454         GetSystemInfo(&info);
455         return info.dwNumberOfProcessors;
456 #endif
457 }
458
459 #define MAX_WPOOL_SIZE  64
460 static pthread_mutex_t wpool_lock = PTHREAD_MUTEX_INITIALIZER;
461 static struct work_item *wpool;
462 static int wpool_size;
463
464 /* work item allocator */
465 static struct work_item *alloc_work_item(void)
466 {
467         struct work_item *w;
468
469         pthread_mutex_lock(&wpool_lock);
470         if(!wpool) {
471                 pthread_mutex_unlock(&wpool_lock);
472                 return malloc(sizeof(struct work_item));
473         }
474
475         w = wpool;
476         wpool = wpool->next;
477         --wpool_size;
478         pthread_mutex_unlock(&wpool_lock);
479         return w;
480 }
481
482 static void free_work_item(struct work_item *w)
483 {
484         pthread_mutex_lock(&wpool_lock);
485         if(wpool_size >= MAX_WPOOL_SIZE) {
486                 free(w);
487         } else {
488                 w->next = wpool;
489                 wpool = w;
490                 ++wpool_size;
491         }
492         pthread_mutex_unlock(&wpool_lock);
493 }