- added libdrawtext
[demo_prior] / libs / drawtext / src / tpool.c
1 /* worker thread pool based on POSIX threads
2  * author: John Tsiombikas <nuclear@member.fsf.org>
3  * This code is public domain.
4  */
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <errno.h>
8
9 #ifdef USE_THREADS
10 #include <unistd.h>
11 #include <sys/time.h>
12 #include <pthread.h>
13 #include "tpool.h"
14
15 struct work_item {
16         void *data;
17         dtx_tpool_callback work, done;
18         struct work_item *next;
19 };
20
21 struct dtx_thread_pool {
22         pthread_t *threads;
23         int num_threads;
24
25         int qsize;
26         struct work_item *workq, *workq_tail;
27         pthread_mutex_t workq_mutex;
28         pthread_cond_t workq_condvar;
29
30         int nactive;    /* number of active workers (not sleeping) */
31
32         pthread_cond_t done_condvar;
33
34         int should_quit;
35         int in_batch;
36 };
37
38 static void *thread_func(void *args);
39
40 struct dtx_thread_pool *dtx_tpool_create(int num_threads)
41 {
42         int i;
43         struct dtx_thread_pool *tpool;
44
45         if(!(tpool = calloc(1, sizeof *tpool))) {
46                 return 0;
47         }
48         pthread_mutex_init(&tpool->workq_mutex, 0);
49         pthread_cond_init(&tpool->workq_condvar, 0);
50         pthread_cond_init(&tpool->done_condvar, 0);
51
52         if(num_threads <= 0) {
53                 num_threads = dtx_tpool_num_processors();
54         }
55         tpool->num_threads = num_threads;
56
57         if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
58                 free(tpool);
59                 return 0;
60         }
61         for(i=0; i<num_threads; i++) {
62                 if(pthread_create(tpool->threads + i, 0, thread_func, tpool) == -1) {
63                         tpool->threads[i] = 0;
64                         dtx_tpool_destroy(tpool);
65                         return 0;
66                 }
67         }
68         return tpool;
69 }
70
71 void dtx_tpool_destroy(struct dtx_thread_pool *tpool)
72 {
73         int i;
74         if(!tpool) return;
75
76         dtx_tpool_clear(tpool);
77         tpool->should_quit = 1;
78
79         pthread_cond_broadcast(&tpool->workq_condvar);
80
81         if(tpool->threads) {
82                 printf("dtx_thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
83                 fflush(stdout);
84
85                 for(i=0; i<tpool->num_threads; i++) {
86                         pthread_join(tpool->threads[i], 0);
87                         putchar('.');
88                         fflush(stdout);
89                 }
90                 putchar('\n');
91                 free(tpool->threads);
92         }
93
94         pthread_mutex_destroy(&tpool->workq_mutex);
95         pthread_cond_destroy(&tpool->workq_condvar);
96         pthread_cond_destroy(&tpool->done_condvar);
97 }
98
99 void dtx_tpool_begin_batch(struct dtx_thread_pool *tpool)
100 {
101         tpool->in_batch = 1;
102 }
103
104 void dtx_tpool_end_batch(struct dtx_thread_pool *tpool)
105 {
106         tpool->in_batch = 0;
107         pthread_cond_broadcast(&tpool->workq_condvar);
108 }
109
110 int dtx_tpool_enqueue(struct dtx_thread_pool *tpool, void *data,
111                 dtx_tpool_callback work_func, dtx_tpool_callback done_func)
112 {
113         struct work_item *job;
114
115         if(!(job = malloc(sizeof *job))) {
116                 return -1;
117         }
118         job->work = work_func;
119         job->done = done_func;
120         job->data = data;
121         job->next = 0;
122
123         pthread_mutex_lock(&tpool->workq_mutex);
124         if(tpool->workq) {
125                 tpool->workq_tail->next = job;
126                 tpool->workq_tail = job;
127         } else {
128                 tpool->workq = tpool->workq_tail = job;
129         }
130         ++tpool->qsize;
131         pthread_mutex_unlock(&tpool->workq_mutex);
132
133         if(!tpool->in_batch) {
134                 pthread_cond_broadcast(&tpool->workq_condvar);
135         }
136         return 0;
137 }
138
139 void dtx_tpool_clear(struct dtx_thread_pool *tpool)
140 {
141         pthread_mutex_lock(&tpool->workq_mutex);
142         while(tpool->workq) {
143                 void *tmp = tpool->workq;
144                 tpool->workq = tpool->workq->next;
145                 free(tmp);
146         }
147         tpool->workq = tpool->workq_tail = 0;
148         tpool->qsize = 0;
149         pthread_mutex_unlock(&tpool->workq_mutex);
150 }
151
152 int dtx_tpool_queued_jobs(struct dtx_thread_pool *tpool)
153 {
154         int res;
155         pthread_mutex_lock(&tpool->workq_mutex);
156         res = tpool->qsize;
157         pthread_mutex_unlock(&tpool->workq_mutex);
158         return res;
159 }
160
161 int dtx_tpool_active_jobs(struct dtx_thread_pool *tpool)
162 {
163         int res;
164         pthread_mutex_lock(&tpool->workq_mutex);
165         res = tpool->nactive;
166         pthread_mutex_unlock(&tpool->workq_mutex);
167         return res;
168 }
169
170 int dtx_tpool_pending_jobs(struct dtx_thread_pool *tpool)
171 {
172         int res;
173         pthread_mutex_lock(&tpool->workq_mutex);
174         res = tpool->qsize + tpool->nactive;
175         pthread_mutex_unlock(&tpool->workq_mutex);
176         return res;
177 }
178
179 void dtx_tpool_wait(struct dtx_thread_pool *tpool)
180 {
181         pthread_mutex_lock(&tpool->workq_mutex);
182         while(tpool->nactive || tpool->qsize) {
183                 pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
184         }
185         pthread_mutex_unlock(&tpool->workq_mutex);
186 }
187
188 void dtx_tpool_wait_one(struct dtx_thread_pool *tpool)
189 {
190         int cur_pending;
191         pthread_mutex_lock(&tpool->workq_mutex);
192         cur_pending = tpool->qsize + tpool->nactive;
193         if(cur_pending) {
194                 while(tpool->qsize + tpool->nactive >= cur_pending) {
195                         pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
196                 }
197         }
198         pthread_mutex_unlock(&tpool->workq_mutex);
199 }
200
201 long dtx_tpool_timedwait(struct dtx_thread_pool *tpool, long timeout)
202 {
203         long sec;
204         struct timespec tout_ts;
205         struct timeval tv0, tv;
206         gettimeofday(&tv0, 0);
207
208         sec = timeout / 1000;
209         tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
210         tout_ts.tv_sec = tv0.tv_sec + sec;
211
212         pthread_mutex_lock(&tpool->workq_mutex);
213         while(tpool->nactive || tpool->qsize) {
214                 if(pthread_cond_timedwait(&tpool->done_condvar,
215                                         &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
216                         break;
217                 }
218         }
219         pthread_mutex_unlock(&tpool->workq_mutex);
220
221         gettimeofday(&tv, 0);
222         return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
223 }
224
225 static void *thread_func(void *args)
226 {
227         struct dtx_thread_pool *tpool = args;
228
229         pthread_mutex_lock(&tpool->workq_mutex);
230         while(!tpool->should_quit) {
231                 pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
232
233                 while(!tpool->should_quit && tpool->workq) {
234                         /* grab the first job */
235                         struct work_item *job = tpool->workq;
236                         tpool->workq = tpool->workq->next;
237                         if(!tpool->workq)
238                                 tpool->workq_tail = 0;
239                         ++tpool->nactive;
240                         --tpool->qsize;
241                         pthread_mutex_unlock(&tpool->workq_mutex);
242
243                         /* do the job */
244                         job->work(job->data);
245                         if(job->done) {
246                                 job->done(job->data);
247                         }
248
249                         pthread_mutex_lock(&tpool->workq_mutex);
250                         /* notify everyone interested that we're done with this job */
251                         pthread_cond_broadcast(&tpool->done_condvar);
252                         --tpool->nactive;
253                 }
254         }
255         pthread_mutex_unlock(&tpool->workq_mutex);
256
257         return 0;
258 }
259 #endif  /* USE_THREADS */
260
261 /* The following highly platform-specific code detects the number
262  * of processors available in the system. It's used by the thread pool
263  * to autodetect how many threads to spawn.
264  * Currently works on: Linux, BSD, Darwin, and Windows.
265  */
266
267 #if defined(__APPLE__) && defined(__MACH__)
268 # ifndef __unix__
269 #  define __unix__      1
270 # endif /* unix */
271 # ifndef __bsd__
272 #  define __bsd__       1
273 # endif /* bsd */
274 #endif  /* apple */
275
276 #if defined(unix) || defined(__unix__)
277 #include <unistd.h>
278
279 # ifdef __bsd__
280 #  include <sys/sysctl.h>
281 # endif
282 #endif
283
284 #if defined(WIN32) || defined(__WIN32__)
285 #include <windows.h>
286 #endif
287
288
289 int dtx_tpool_num_processors(void)
290 {
291 #if defined(unix) || defined(__unix__)
292 # if defined(__bsd__)
293         /* BSD systems provide the num.processors through sysctl */
294         int num, mib[] = {CTL_HW, HW_NCPU};
295         size_t len = sizeof num;
296
297         sysctl(mib, 2, &num, &len, 0, 0);
298         return num;
299
300 # elif defined(__sgi)
301         /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
302         return sysconf(_SC_NPROC_ONLN);
303 # else
304         /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
305         return sysconf(_SC_NPROCESSORS_ONLN);
306 # endif /* bsd/sgi/other */
307
308 #elif defined(WIN32) || defined(__WIN32__)
309         /* under windows we need to call GetSystemInfo */
310         SYSTEM_INFO info;
311         GetSystemInfo(&info);
312         return info.dwNumberOfProcessors;
313 #endif
314 }