mod_url progress
authorJohn Tsiombikas <nuclear@member.fsf.org>
Wed, 26 Sep 2018 05:56:07 +0000 (08:56 +0300)
committerJohn Tsiombikas <nuclear@member.fsf.org>
Wed, 26 Sep 2018 05:56:07 +0000 (08:56 +0300)
.gitignore
Makefile
src/md4.c [new file with mode: 0644]
src/md4.h [new file with mode: 0644]
src/mod_url.c
src/tpool.c [new file with mode: 0644]
src/tpool.h [new file with mode: 0644]

index 17859ab..6592a48 100644 (file)
@@ -3,3 +3,4 @@
 *.swp
 *.a
 *.so*
+examples/asscat/asscat
index ab2a248..c3abbe7 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -15,9 +15,11 @@ shared = -shared -Wl,-soname,$(soname)
 warn = -pedantic -Wall
 dbg = -g
 opt = -O0
+def = -DBUILD_MOD_URL
+pic = -fPIC
 
-CFLAGS = $(warn) $(dbg) $(opt) $(inc)
-LDFLAGS = -lcurl
+CFLAGS = $(warn) $(dbg) $(opt) $(pic) $(def) $(inc) -pthread
+LDFLAGS = -pthread -lpthread -lcurl
 
 .PHONY: all
 all: $(lib_so) $(lib_a) $(soname) $(devlink)
diff --git a/src/md4.c b/src/md4.c
new file mode 100644 (file)
index 0000000..e135d1b
--- /dev/null
+++ b/src/md4.c
@@ -0,0 +1,259 @@
+/* MD4C.C - RSA Data Security, Inc., MD4 message-digest algorithm
+ */
+
+/* Copyright (C) 1990-2, RSA Data Security, Inc. All rights reserved.
+
+   License to copy and use this software is granted provided that it
+   is identified as the "RSA Data Security, Inc. MD4 Message-Digest
+   Algorithm" in all material mentioning or referencing this software
+   or this function.
+
+   License is also granted to make and use derivative works provided
+   that such works are identified as "derived from the RSA Data
+   Security, Inc. MD4 Message-Digest Algorithm" in all material
+   mentioning or referencing the derived work.
+
+   RSA Data Security, Inc. makes no representations concerning either
+   the merchantability of this software or the suitability of this
+   software for any particular purpose. It is provided "as is"
+   without express or implied warranty of any kind.
+
+   These notices must be retained in any copies of any part of this
+   documentation and/or software.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include "md4.h"
+
+/* Constants for MD4Transform routine.
+ */
+#define S11 3
+#define S12 7
+#define S13 11
+#define S14 19
+#define S21 3
+#define S22 5
+#define S23 9
+#define S24 13
+#define S31 3
+#define S32 9
+#define S33 11
+#define S34 15
+
+static void MD4Transform (uint32_t [4], unsigned char [64]);
+static void Encode (unsigned char *, uint32_t *, unsigned int);
+static void Decode (uint32_t *, unsigned char *, unsigned int);
+
+static unsigned char PADDING[64] = {
+  0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+  0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+  0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+};
+
+/* F, G and H are basic MD4 functions.
+ */
+#define F(x, y, z) (((x) & (y)) | ((~x) & (z)))
+#define G(x, y, z) (((x) & (y)) | ((x) & (z)) | ((y) & (z)))
+#define H(x, y, z) ((x) ^ (y) ^ (z))
+
+/* ROTATE_LEFT rotates x left n bits.
+ */
+#define ROTATE_LEFT(x, n) (((x) << (n)) | ((x) >> (32-(n))))
+
+/* FF, GG and HH are transformations for rounds 1, 2 and 3 */
+/* Rotation is separate from addition to prevent recomputation */
+
+#define FF(a, b, c, d, x, s) { \
+    (a) += F ((b), (c), (d)) + (x); \
+    (a) = ROTATE_LEFT ((a), (s)); \
+  }
+#define GG(a, b, c, d, x, s) { \
+    (a) += G ((b), (c), (d)) + (x) + (uint32_t)0x5a827999; \
+    (a) = ROTATE_LEFT ((a), (s)); \
+  }
+#define HH(a, b, c, d, x, s) { \
+    (a) += H ((b), (c), (d)) + (x) + (uint32_t)0x6ed9eba1; \
+    (a) = ROTATE_LEFT ((a), (s)); \
+  }
+
+/* MD4 initialization. Begins an MD4 operation, writing a new context.
+ */
+void MD4Init (MD4_CTX *context)
+{
+  context->count[0] = context->count[1] = 0;
+
+  /* Load magic initialization constants.
+   */
+  context->state[0] = 0x67452301;
+  context->state[1] = 0xefcdab89;
+  context->state[2] = 0x98badcfe;
+  context->state[3] = 0x10325476;
+}
+
+/* MD4 block update operation. Continues an MD4 message-digest
+     operation, processing another message block, and updating the
+     context.
+ */
+void MD4Update (MD4_CTX *context, unsigned char *input, unsigned int inputLen)
+{
+  unsigned int i, index, partLen;
+
+  /* Compute number of bytes mod 64 */
+  index = (unsigned int)((context->count[0] >> 3) & 0x3F);
+  /* Update number of bits */
+  if ((context->count[0] += ((uint32_t)inputLen << 3))
+      < ((uint32_t)inputLen << 3))
+    context->count[1]++;
+  context->count[1] += ((uint32_t)inputLen >> 29);
+
+  partLen = 64 - index;
+
+  /* Transform as many times as possible.
+   */
+  if (inputLen >= partLen) {
+    memcpy
+      ((unsigned char *)&context->buffer[index], (unsigned char *)input, partLen);
+    MD4Transform (context->state, context->buffer);
+
+    for (i = partLen; i + 63 < inputLen; i += 64)
+      MD4Transform (context->state, &input[i]);
+
+    index = 0;
+  }
+  else
+    i = 0;
+
+  /* Buffer remaining input */
+  memcpy
+    ((unsigned char *)&context->buffer[index], (unsigned char *)&input[i],
+     inputLen-i);
+}
+
+/* MD4 finalization. Ends an MD4 message-digest operation, writing the
+     the message digest and zeroizing the context.
+ */
+void MD4Final (unsigned char digest[16], MD4_CTX *context)
+{
+  unsigned char bits[8];
+  unsigned int index, padLen;
+
+  /* Save number of bits */
+  Encode (bits, context->count, 8);
+
+  /* Pad out to 56 mod 64.
+   */
+  index = (unsigned int)((context->count[0] >> 3) & 0x3f);
+  padLen = (index < 56) ? (56 - index) : (120 - index);
+  MD4Update (context, PADDING, padLen);
+
+  /* Append length (before padding) */
+  MD4Update (context, bits, 8);
+  /* Store state in digest */
+  Encode (digest, context->state, 16);
+
+  /* Zeroize sensitive information.
+   */
+  memset ((unsigned char *)context, 0, sizeof (*context));
+
+}
+
+/* MD4 basic transformation. Transforms state based on block.
+ */
+static void MD4Transform (uint32_t state[4], unsigned char block[64])
+{
+  uint32_t a = state[0], b = state[1], c = state[2], d = state[3], x[16];
+
+  Decode (x, block, 64);
+
+  /* Round 1 */
+  FF (a, b, c, d, x[ 0], S11); /* 1 */
+  FF (d, a, b, c, x[ 1], S12); /* 2 */
+  FF (c, d, a, b, x[ 2], S13); /* 3 */
+  FF (b, c, d, a, x[ 3], S14); /* 4 */
+  FF (a, b, c, d, x[ 4], S11); /* 5 */
+  FF (d, a, b, c, x[ 5], S12); /* 6 */
+  FF (c, d, a, b, x[ 6], S13); /* 7 */
+  FF (b, c, d, a, x[ 7], S14); /* 8 */
+  FF (a, b, c, d, x[ 8], S11); /* 9 */
+  FF (d, a, b, c, x[ 9], S12); /* 10 */
+  FF (c, d, a, b, x[10], S13); /* 11 */
+  FF (b, c, d, a, x[11], S14); /* 12 */
+  FF (a, b, c, d, x[12], S11); /* 13 */
+  FF (d, a, b, c, x[13], S12); /* 14 */
+  FF (c, d, a, b, x[14], S13); /* 15 */
+  FF (b, c, d, a, x[15], S14); /* 16 */
+
+  /* Round 2 */
+  GG (a, b, c, d, x[ 0], S21); /* 17 */
+  GG (d, a, b, c, x[ 4], S22); /* 18 */
+  GG (c, d, a, b, x[ 8], S23); /* 19 */
+  GG (b, c, d, a, x[12], S24); /* 20 */
+  GG (a, b, c, d, x[ 1], S21); /* 21 */
+  GG (d, a, b, c, x[ 5], S22); /* 22 */
+  GG (c, d, a, b, x[ 9], S23); /* 23 */
+  GG (b, c, d, a, x[13], S24); /* 24 */
+  GG (a, b, c, d, x[ 2], S21); /* 25 */
+  GG (d, a, b, c, x[ 6], S22); /* 26 */
+  GG (c, d, a, b, x[10], S23); /* 27 */
+  GG (b, c, d, a, x[14], S24); /* 28 */
+  GG (a, b, c, d, x[ 3], S21); /* 29 */
+  GG (d, a, b, c, x[ 7], S22); /* 30 */
+  GG (c, d, a, b, x[11], S23); /* 31 */
+  GG (b, c, d, a, x[15], S24); /* 32 */
+
+  /* Round 3 */
+  HH (a, b, c, d, x[ 0], S31); /* 33 */
+  HH (d, a, b, c, x[ 8], S32); /* 34 */
+  HH (c, d, a, b, x[ 4], S33); /* 35 */
+  HH (b, c, d, a, x[12], S34); /* 36 */
+  HH (a, b, c, d, x[ 2], S31); /* 37 */
+  HH (d, a, b, c, x[10], S32); /* 38 */
+  HH (c, d, a, b, x[ 6], S33); /* 39 */
+  HH (b, c, d, a, x[14], S34); /* 40 */
+  HH (a, b, c, d, x[ 1], S31); /* 41 */
+  HH (d, a, b, c, x[ 9], S32); /* 42 */
+  HH (c, d, a, b, x[ 5], S33); /* 43 */
+  HH (b, c, d, a, x[13], S34); /* 44 */
+  HH (a, b, c, d, x[ 3], S31); /* 45 */
+  HH (d, a, b, c, x[11], S32); /* 46 */
+  HH (c, d, a, b, x[ 7], S33); /* 47 */
+  HH (b, c, d, a, x[15], S34); /* 48 */
+
+  state[0] += a;
+  state[1] += b;
+  state[2] += c;
+  state[3] += d;
+
+  /* Zeroize sensitive information.
+   */
+  memset ((unsigned char *)x, 0, sizeof (x));
+}
+
+/* Encodes input (uint32_t) into output (unsigned char). Assumes len is
+     a multiple of 4.
+ */
+static void Encode (unsigned char *output, uint32_t *input, unsigned int len)
+{
+  unsigned int i, j;
+
+  for (i = 0, j = 0; j < len; i++, j += 4) {
+    output[j] = (unsigned char)(input[i] & 0xff);
+    output[j+1] = (unsigned char)((input[i] >> 8) & 0xff);
+    output[j+2] = (unsigned char)((input[i] >> 16) & 0xff);
+    output[j+3] = (unsigned char)((input[i] >> 24) & 0xff);
+  }
+}
+
+/* Decodes input (unsigned char) into output (uint32_t). Assumes len is
+     a multiple of 4.
+
+ */
+static void Decode (uint32_t *output, unsigned char *input, unsigned int len)
+{
+  unsigned int i, j;
+
+  for (i = 0, j = 0; j < len; i++, j += 4)
+    output[i] = ((uint32_t)input[j]) | (((uint32_t)input[j+1]) << 8) |
+      (((uint32_t)input[j+2]) << 16) | (((uint32_t)input[j+3]) << 24);
+}
diff --git a/src/md4.h b/src/md4.h
new file mode 100644 (file)
index 0000000..26f7bc5
--- /dev/null
+++ b/src/md4.h
@@ -0,0 +1,34 @@
+/* Copyright (C) 1991-2, RSA Data Security, Inc. Created 1991. All
+   rights reserved.
+
+   License to copy and use this software is granted provided that it
+   is identified as the "RSA Data Security, Inc. MD4 Message-Digest
+   Algorithm" in all material mentioning or referencing this software
+   or this function.
+
+   License is also granted to make and use derivative works provided
+   that such works are identified as "derived from the RSA Data
+   Security, Inc. MD4 Message-Digest Algorithm" in all material
+   mentioning or referencing the derived work.
+
+   RSA Data Security, Inc. makes no representations concerning either
+   the merchantability of this software or the suitability of this
+   software for any particular purpose. It is provided "as is"
+   without express or implied warranty of any kind.
+
+   These notices must be retained in any copies of any part of this
+   documentation and/or software.
+ */
+
+#include <inttypes.h>
+
+/* MD4 context. */
+typedef struct {
+       uint32_t state[4];                      /* state (ABCD) */
+       uint32_t count[2];                      /* number of bits, modulo 2^64 (lsb first) */
+       unsigned char buffer[64];       /* input buffer */
+} MD4_CTX;
+
+void MD4Init(MD4_CTX *);
+void MD4Update(MD4_CTX *, unsigned char *, unsigned int);
+void MD4Final(unsigned char [16], MD4_CTX *);
index c8c62cb..5c0f81f 100644 (file)
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <errno.h>
+#include "assman_impl.h"
+#include "tpool.h"
+#include "md4.h"
 
 #ifdef BUILD_MOD_URL
+#include <pthread.h>
 #include <curl/curl.h>
 
+enum {
+       DL_UNKNOWN,
+       DL_STARTED,
+       DL_ERROR,
+       DL_DONE
+};
+
+struct file_info {
+       char *url;
+       char *cache_fname;
+
+       FILE *cache_file;
+
+       /* fopen-thread waits until the state becomes known (request starts transmitting or fails) */
+       int state;
+       pthread_cond_t state_cond;
+       pthread_mutex_t state_mutex;
+};
+
+static void *fop_open(const char *fname, void *udata);
+static void fop_close(void *fp, void *udata);
+static long fop_seek(void *fp, long offs, int whence, void *udata);
+static long fop_read(void *fp, void *buf, long size, void *udata);
+
+static void exit_cleanup(void);
+static void download(void *data);
+static size_t recv_callback(char *ptr, size_t size, size_t nmemb, void *udata);
 static const char *get_temp_dir(void);
 
 static char *tmpdir, *cachedir;
+static struct thread_pool *tpool;
+static CURL **curl;
 
 struct ass_fileops *ass_alloc_url(const char *url)
 {
        static int done_init;
        struct ass_fileops *fop;
+       int i;
 
        if(!done_init) {
                curl_global_init(CURL_GLOBAL_ALL);
-               atexit(curl_global_cleanup);
-               done_init = 1;
+               atexit(exit_cleanup);
 
                if(!*ass_mod_url_cachedir) {
                        strcpy(ass_mod_url_cachedir, "assman_cache");
                }
-               tmpdir = get_temp_dir();
+               tmpdir = (char*)get_temp_dir();
                if(!(cachedir = malloc(strlen(ass_mod_url_cachedir) + strlen(tmpdir) + 2))) {
                        fprintf(stderr, "assman: failed to allocate cachedir path buffer\n");
-                       return 0;
+                       goto init_failed;
                }
                sprintf(cachedir, "%s/%s", tmpdir, ass_mod_url_cachedir);
+
+               if(ass_mod_url_max_threads <= 0) {
+                       ass_mod_url_max_threads = 8;
+               }
+
+               if(!(curl = calloc(ass_mod_url_max_threads, sizeof *curl))) {
+                       perror("assman: failed to allocate curl context table");
+                       goto init_failed;
+               }
+               for(i=0; i<ass_mod_url_max_threads; i++) {
+                       if(!(curl[i] = curl_easy_init())) {
+                               goto init_failed;
+                       }
+                       curl_easy_setopt(curl[i], CURLOPT_WRITEFUNCTION, recv_callback);
+               }
+
+               if(!(tpool = ass_tpool_create(ass_mod_url_max_threads))) {
+                       fprintf(stderr, "assman: failed to create thread pool\n");
+                       goto init_failed;
+               }
+
+               done_init = 1;
        }
 
        if(!(fop = malloc(sizeof *fop))) {
                return 0;
        }
+       if(!(fop->udata = malloc(strlen(url) + 1))) {
+               free(fop);
+               return 0;
+       }
+       strcpy(fop->udata, url);
 
-       fop->udata = 0;
        fop->open = fop_open;
        fop->close = fop_close;
        fop->seek = fop_seek;
        fop->read = fop_read;
        return fop;
+
+init_failed:
+       free(cachedir);
+       if(curl) {
+               for(i=0; i<ass_mod_url_max_threads; i++) {
+                       if(curl[i]) {
+                               curl_easy_cleanup(curl[i]);
+                       }
+               }
+               free(curl);
+       }
+       return 0;
 }
 
+static void exit_cleanup(void)
+{
+       int i;
+
+       if(tpool) {
+               ass_tpool_destroy(tpool);
+       }
+       if(curl) {
+               for(i=0; i<ass_mod_url_max_threads; i++) {
+                       if(curl[i]) {
+                               curl_easy_cleanup(curl[i]);
+                       }
+               }
+               free(curl);
+       }
+       curl_global_cleanup();
+}
+
+
 void ass_free_url(struct ass_fileops *fop)
 {
 }
 
+static char *cache_filename(const char *fname, const char *url_prefix)
+{
+       MD4_CTX md4ctx;
+       unsigned char sum[16];
+       char sumstr[33];
+       char *resfname;
+       int i;
+       int fname_len = strlen(fname);
+       int prefix_len = strlen(url_prefix);
+       int url_len = fname_len + prefix_len + 1;
+
+       char *url = alloca(url_len + 1);
+       sprintf(url, "%s/%s", url_prefix, fname);
+
+       MD4Init(&md4ctx);
+       MD4Update(&md4ctx, (unsigned char*)url, url_len);
+       MD4Final((unsigned char*)sum, &md4ctx);
+
+       for(i=0; i<16; i++) {
+               sprintf(sumstr + i * 2, "%x", (unsigned int)sum[i]);
+       }
+       sumstr[32] = 0;
+
+       prefix_len = strlen(cachedir);
+       if(!(resfname = malloc(prefix_len + fname_len + 20))) {
+               return 0;
+       }
+       sprintf(resfname, "%s/%s-%s", cachedir, fname, sumstr);
+       return resfname;
+}
+
 static void *fop_open(const char *fname, void *udata)
 {
-       return 0;       /* TODO */
+       struct file_info *file;
+       int state;
+
+       if(!(file = malloc(sizeof *file))) {
+               ass_errno = ENOMEM;
+               return 0;
+       }
+       if(!(file->cache_fname = cache_filename(fname, udata))) {
+               free(file);
+               ass_errno = ENOMEM;
+               return 0;
+       }
+       if(!(file->cache_file = fopen(file->cache_fname, "wb"))) {
+               fprintf(stderr, "assman: mod_url: failed to open cache file (%s) for writing: %s\n",
+                               file->cache_fname, strerror(errno));
+               ass_errno = errno;
+               free(file->cache_fname);
+               free(file);
+               return 0;
+       }
+
+       file->state = DL_UNKNOWN;
+       pthread_mutex_init(&file->state_mutex, 0);
+       pthread_cond_init(&file->state_cond, 0);
+
+       ass_tpool_enqueue(tpool, file, download, 0);
+
+       /* wait until the file changes state */
+       pthread_mutex_lock(&file->state_mutex);
+       while(file->state == DL_UNKNOWN) {
+               pthread_cond_wait(&file->state_cond, &file->state_mutex);
+       }
+       state = file->state;
+       pthread_mutex_unlock(&file->state_mutex);
+
+       if(state == DL_ERROR) {
+               /* the worker stopped, so we can safely cleanup and return error */
+               fclose(file->cache_file);
+               remove(file->cache_fname);
+               free(file->cache_fname);
+               pthread_cond_destroy(&file->state_cond);
+               pthread_mutex_destroy(&file->state_mutex);
+               free(file);
+               ass_errno = ENOENT;     /* TODO: differentiate between 403 and 404 */
+               return 0;
+       }
+       return file;
+}
+
+static void wait_done(struct file_info *file)
+{
+       pthread_mutex_lock(&file->state_mutex);
+       while(file->state != DL_DONE && file->state != DL_ERROR) {
+               pthread_cond_wait(&file->state_cond, &file->state_mutex);
+       }
+       pthread_mutex_unlock(&file->state_mutex);
 }
 
 static void fop_close(void *fp, void *udata)
 {
+       struct file_info *file = fp;
+
+       wait_done(file);        /* TODO: stop download instead of waiting to finish */
+
+       fclose(file->cache_file);
+       if(file->state == DL_ERROR) remove(file->cache_fname);
+       free(file->cache_fname);
+       pthread_cond_destroy(&file->state_cond);
+       pthread_mutex_destroy(&file->state_mutex);
+       free(file);
 }
 
 static long fop_seek(void *fp, long offs, int whence, void *udata)
 {
-       return 0;
+       struct file_info *file = fp;
+       wait_done(file);
+
+       fseek(file->cache_file, offs, whence);
+       return ftell(file->cache_file);
 }
 
 static long fop_read(void *fp, void *buf, long size, void *udata)
 {
-       return 0;
+       struct file_info *file = fp;
+       wait_done(file);
+
+       return fread(buf, 1, size, file->cache_file);
 }
 
-#else
+/* this is the function called by the worker threads to perform the download
+ * signal state changes, and prepare the cache file for reading
+ */
+static void download(void *data)
+{
+       int tid, res;
+       struct file_info *file = data;
+
+       tid = ass_tpool_thread_id(tpool);
+
+       curl_easy_setopt(curl[tid], CURLOPT_URL, file->url);
+       curl_easy_setopt(curl[tid], CURLOPT_WRITEDATA, file);
+       res = curl_easy_perform(curl[tid]);
+
+       pthread_mutex_lock(&file->state_mutex);
+       if(res == CURLE_OK) {
+               file->state = DL_DONE;
+               fclose(file->cache_file);
+               if(!(file->cache_file = fopen(file->cache_fname, "rb"))) {
+                       fprintf(stderr, "assman: failed to reopen cache file (%s) for reading: %s\n",
+                                       file->cache_fname, strerror(errno));
+                       file->state = DL_ERROR;
+               }
+       } else {
+               file->state = DL_ERROR;
+       }
+       pthread_cond_broadcast(&file->state_cond);
+       pthread_mutex_unlock(&file->state_mutex);
+}
+
+/* this function is called by curl to pass along downloaded data chunks */
+static size_t recv_callback(char *ptr, size_t size, size_t count, void *udata)
+{
+       struct file_info *file = udata;
+
+       pthread_mutex_lock(&file->state_mutex);
+       if(file->state == DL_UNKNOWN) {
+               file->state = DL_STARTED;
+               pthread_cond_broadcast(&file->state_cond);
+       }
+       pthread_mutex_unlock(&file->state_mutex);
+
+       return fwrite(ptr, size, count, file->cache_file);
+}
+
+#ifdef WIN32
+#include <windows.h>
+
+static const char *get_temp_dir(void)
+{
+       static char buf[MAX_PATH + 1];
+       GetTempPathA(MAX_PATH + 1, buf);
+       return buf;
+}
+#else  /* UNIX */
+static const char *get_temp_dir(void)
+{
+       char *env = getenv("TMPDIR");
+       return env ? env : "/tmp";
+}
+#endif
+
+
+
+#else  /* don't build mod_url */
 struct ass_fileops *ass_alloc_url(const char *url)
 {
        fprintf(stderr, "assman: compiled without URL asset source support\n");
diff --git a/src/tpool.c b/src/tpool.c
new file mode 100644 (file)
index 0000000..500ef6c
--- /dev/null
@@ -0,0 +1,493 @@
+/* worker thread pool based on POSIX threads
+ * author: John Tsiombikas <nuclear@member.fsf.org>
+ * This code is public domain.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <pthread.h>
+#include "tpool.h"
+
+#if defined(__APPLE__) && defined(__MACH__)
+# ifndef __unix__
+#  define __unix__     1
+# endif        /* unix */
+# ifndef __bsd__
+#  define __bsd__      1
+# endif        /* bsd */
+#endif /* apple */
+
+#if defined(unix) || defined(__unix__)
+#include <unistd.h>
+#include <sys/time.h>
+
+# ifdef __bsd__
+#  include <sys/sysctl.h>
+# endif
+#endif
+
+#if defined(WIN32) || defined(__WIN32__)
+#include <windows.h>
+#endif
+
+
+struct work_item {
+       void *data;
+       tpool_callback work, done;
+       struct work_item *next;
+};
+
+struct thread_data {
+       int id;
+       struct thread_pool *pool;
+};
+
+struct thread_pool {
+       pthread_t *threads;
+       struct thread_data *tdata;
+       int num_threads;
+       pthread_key_t idkey;
+
+       int qsize;
+       struct work_item *workq, *workq_tail;
+       pthread_mutex_t workq_mutex;
+       pthread_cond_t workq_condvar;
+
+       int nactive;    /* number of active workers (not sleeping) */
+
+       pthread_cond_t done_condvar;
+
+       int should_quit;
+       int in_batch;
+
+       int nref;       /* reference count */
+
+#if defined(WIN32) || defined(__WIN32__)
+       HANDLE wait_event;
+#else
+       int wait_pipe[2];
+#endif
+};
+
+static void *thread_func(void *args);
+static void send_done_event(struct thread_pool *tpool);
+
+static struct work_item *alloc_work_item(void);
+static void free_work_item(struct work_item *w);
+
+
+struct thread_pool *ass_tpool_create(int num_threads)
+{
+       int i;
+       struct thread_pool *tpool;
+
+       if(!(tpool = calloc(1, sizeof *tpool))) {
+               return 0;
+       }
+       pthread_mutex_init(&tpool->workq_mutex, 0);
+       pthread_cond_init(&tpool->workq_condvar, 0);
+       pthread_cond_init(&tpool->done_condvar, 0);
+       pthread_key_create(&tpool->idkey, 0);
+
+       pthread_setspecific(tpool->idkey, (void*)0xffffffff);
+
+#if !defined(WIN32) && !defined(__WIN32__)
+       tpool->wait_pipe[0] = tpool->wait_pipe[1] = -1;
+#endif
+
+       if(num_threads <= 0) {
+               num_threads = ass_tpool_num_processors();
+       }
+       tpool->num_threads = num_threads;
+
+       if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
+               free(tpool);
+               return 0;
+       }
+       if(!(tpool->tdata = malloc(num_threads * sizeof *tpool->tdata))) {
+               free(tpool->threads);
+               free(tpool);
+               return 0;
+       }
+
+       for(i=0; i<num_threads; i++) {
+               tpool->tdata[i].id = i;
+               tpool->tdata[i].pool = tpool;
+
+               if(pthread_create(tpool->threads + i, 0, thread_func, tpool->tdata + i) == -1) {
+                       /*tpool->threads[i] = 0;*/
+                       ass_tpool_destroy(tpool);
+                       return 0;
+               }
+       }
+       return tpool;
+}
+
+void ass_tpool_destroy(struct thread_pool *tpool)
+{
+       int i;
+       if(!tpool) return;
+
+       ass_tpool_clear(tpool);
+       tpool->should_quit = 1;
+
+       pthread_cond_broadcast(&tpool->workq_condvar);
+
+       if(tpool->threads) {
+               printf("thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
+               fflush(stdout);
+
+               for(i=0; i<tpool->num_threads; i++) {
+                       pthread_join(tpool->threads[i], 0);
+                       putchar('.');
+                       fflush(stdout);
+               }
+               putchar('\n');
+               free(tpool->threads);
+       }
+       free(tpool->tdata);
+
+       /* also wake up anyone waiting on the wait* calls */
+       tpool->nactive = 0;
+       pthread_cond_broadcast(&tpool->done_condvar);
+       send_done_event(tpool);
+
+       pthread_mutex_destroy(&tpool->workq_mutex);
+       pthread_cond_destroy(&tpool->workq_condvar);
+       pthread_cond_destroy(&tpool->done_condvar);
+       pthread_key_delete(tpool->idkey);
+
+#if defined(WIN32) || defined(__WIN32__)
+       if(tpool->wait_event) {
+               CloseHandle(tpool->wait_event);
+       }
+#else
+       if(tpool->wait_pipe[0] >= 0) {
+               close(tpool->wait_pipe[0]);
+               close(tpool->wait_pipe[1]);
+       }
+#endif
+}
+
+int ass_tpool_addref(struct thread_pool *tpool)
+{
+       return ++tpool->nref;
+}
+
+int ass_tpool_release(struct thread_pool *tpool)
+{
+       if(--tpool->nref <= 0) {
+               ass_tpool_destroy(tpool);
+               return 0;
+       }
+       return tpool->nref;
+}
+
+void ass_tpool_begin_batch(struct thread_pool *tpool)
+{
+       tpool->in_batch = 1;
+}
+
+void ass_tpool_end_batch(struct thread_pool *tpool)
+{
+       tpool->in_batch = 0;
+       pthread_cond_broadcast(&tpool->workq_condvar);
+}
+
+int ass_tpool_enqueue(struct thread_pool *tpool, void *data,
+               tpool_callback work_func, tpool_callback done_func)
+{
+       struct work_item *job;
+
+       if(!(job = alloc_work_item())) {
+               return -1;
+       }
+       job->work = work_func;
+       job->done = done_func;
+       job->data = data;
+       job->next = 0;
+
+       pthread_mutex_lock(&tpool->workq_mutex);
+       if(tpool->workq) {
+               tpool->workq_tail->next = job;
+               tpool->workq_tail = job;
+       } else {
+               tpool->workq = tpool->workq_tail = job;
+       }
+       ++tpool->qsize;
+       pthread_mutex_unlock(&tpool->workq_mutex);
+
+       if(!tpool->in_batch) {
+               pthread_cond_broadcast(&tpool->workq_condvar);
+       }
+       return 0;
+}
+
+void ass_tpool_clear(struct thread_pool *tpool)
+{
+       pthread_mutex_lock(&tpool->workq_mutex);
+       while(tpool->workq) {
+               void *tmp = tpool->workq;
+               tpool->workq = tpool->workq->next;
+               free(tmp);
+       }
+       tpool->workq = tpool->workq_tail = 0;
+       tpool->qsize = 0;
+       pthread_mutex_unlock(&tpool->workq_mutex);
+}
+
+int ass_tpool_queued_jobs(struct thread_pool *tpool)
+{
+       int res;
+       pthread_mutex_lock(&tpool->workq_mutex);
+       res = tpool->qsize;
+       pthread_mutex_unlock(&tpool->workq_mutex);
+       return res;
+}
+
+int ass_tpool_active_jobs(struct thread_pool *tpool)
+{
+       int res;
+       pthread_mutex_lock(&tpool->workq_mutex);
+       res = tpool->nactive;
+       pthread_mutex_unlock(&tpool->workq_mutex);
+       return res;
+}
+
+int ass_tpool_pending_jobs(struct thread_pool *tpool)
+{
+       int res;
+       pthread_mutex_lock(&tpool->workq_mutex);
+       res = tpool->qsize + tpool->nactive;
+       pthread_mutex_unlock(&tpool->workq_mutex);
+       return res;
+}
+
+void ass_tpool_wait(struct thread_pool *tpool)
+{
+       pthread_mutex_lock(&tpool->workq_mutex);
+       while(tpool->nactive || tpool->qsize) {
+               pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
+       }
+       pthread_mutex_unlock(&tpool->workq_mutex);
+}
+
+void ass_tpool_wait_pending(struct thread_pool *tpool, int pending_target)
+{
+       pthread_mutex_lock(&tpool->workq_mutex);
+       while(tpool->qsize + tpool->nactive > pending_target) {
+               pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
+       }
+       pthread_mutex_unlock(&tpool->workq_mutex);
+}
+
+#if defined(WIN32) || defined(__WIN32__)
+long ass_tpool_timedwait(struct thread_pool *tpool, long timeout)
+{
+       fprintf(stderr, "tpool_timedwait currently unimplemented on windows\n");
+       abort();
+       return 0;
+}
+
+/* TODO: actually does this work with MinGW ? */
+int ass_tpool_get_wait_fd(struct thread_pool *tpool)
+{
+       static int once;
+       if(!once) {
+               once = 1;
+               fprintf(stderr, "warning: ass_tpool_get_wait_fd call on Windows does nothing\n");
+       }
+       return 0;
+}
+
+void *ass_tpool_get_wait_handle(struct thread_pool *tpool)
+{
+       if(!tpool->wait_event) {
+               if(!(tpool->wait_event = CreateEvent(0, 0, 0, 0))) {
+                       return 0;
+               }
+       }
+       return tpool->wait_event;
+}
+
+static void send_done_event(struct thread_pool *tpool)
+{
+       if(tpool->wait_event) {
+               SetEvent(tpool->wait_event);
+       }
+}
+
+#else          /* UNIX */
+
+long ass_tpool_timedwait(struct thread_pool *tpool, long timeout)
+{
+       struct timespec tout_ts;
+       struct timeval tv0, tv;
+       gettimeofday(&tv0, 0);
+
+       long sec = timeout / 1000;
+       tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
+       tout_ts.tv_sec = tv0.tv_sec + sec;
+
+       pthread_mutex_lock(&tpool->workq_mutex);
+       while(tpool->nactive || tpool->qsize) {
+               if(pthread_cond_timedwait(&tpool->done_condvar,
+                                       &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
+                       break;
+               }
+       }
+       pthread_mutex_unlock(&tpool->workq_mutex);
+
+       gettimeofday(&tv, 0);
+       return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
+}
+
+int ass_tpool_get_wait_fd(struct thread_pool *tpool)
+{
+       if(tpool->wait_pipe[0] < 0) {
+               if(pipe(tpool->wait_pipe) == -1) {
+                       return -1;
+               }
+       }
+       return tpool->wait_pipe[0];
+}
+
+void *ass_tpool_get_wait_handle(struct thread_pool *tpool)
+{
+       static int once;
+       if(!once) {
+               once = 1;
+               fprintf(stderr, "warning: ass_tpool_get_wait_handle call on UNIX does nothing\n");
+       }
+       return 0;
+}
+
+static void send_done_event(struct thread_pool *tpool)
+{
+       if(tpool->wait_pipe[1] >= 0) {
+               write(tpool->wait_pipe[1], tpool, 1);
+       }
+}
+#endif /* WIN32/UNIX */
+
+static void *thread_func(void *args)
+{
+       struct thread_data *tdata = args;
+       struct thread_pool *tpool = tdata->pool;
+
+       pthread_setspecific(tpool->idkey, (void*)(intptr_t)tdata->id);
+
+       pthread_mutex_lock(&tpool->workq_mutex);
+       while(!tpool->should_quit) {
+               if(!tpool->workq) {
+                       pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
+                       if(tpool->should_quit) break;
+               }
+
+               while(!tpool->should_quit && tpool->workq) {
+                       /* grab the first job */
+                       struct work_item *job = tpool->workq;
+                       tpool->workq = tpool->workq->next;
+                       if(!tpool->workq)
+                               tpool->workq_tail = 0;
+                       ++tpool->nactive;
+                       --tpool->qsize;
+                       pthread_mutex_unlock(&tpool->workq_mutex);
+
+                       /* do the job */
+                       job->work(job->data);
+                       if(job->done) {
+                               job->done(job->data);
+                       }
+                       free_work_item(job);
+
+                       pthread_mutex_lock(&tpool->workq_mutex);
+                       /* notify everyone interested that we're done with this job */
+                       pthread_cond_broadcast(&tpool->done_condvar);
+                       send_done_event(tpool);
+                       --tpool->nactive;
+               }
+       }
+       pthread_mutex_unlock(&tpool->workq_mutex);
+
+       return 0;
+}
+
+
+int ass_tpool_thread_id(struct thread_pool *tpool)
+{
+       int id = (intptr_t)pthread_getspecific(tpool->idkey);
+       if(id >= tpool->num_threads) {
+               return -1;
+       }
+       return id;
+}
+
+
+/* The following highly platform-specific code detects the number
+ * of processors available in the system. It's used by the thread pool
+ * to autodetect how many threads to spawn.
+ * Currently works on: Linux, BSD, Darwin, and Windows.
+ */
+int ass_tpool_num_processors(void)
+{
+#if defined(unix) || defined(__unix__)
+# if defined(__bsd__)
+       /* BSD systems provide the num.processors through sysctl */
+       int num, mib[] = {CTL_HW, HW_NCPU};
+       size_t len = sizeof num;
+
+       sysctl(mib, 2, &num, &len, 0, 0);
+       return num;
+
+# elif defined(__sgi)
+       /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
+       return sysconf(_SC_NPROC_ONLN);
+# else
+       /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
+       return sysconf(_SC_NPROCESSORS_ONLN);
+# endif        /* bsd/sgi/other */
+
+#elif defined(WIN32) || defined(__WIN32__)
+       /* under windows we need to call GetSystemInfo */
+       SYSTEM_INFO info;
+       GetSystemInfo(&info);
+       return info.dwNumberOfProcessors;
+#endif
+}
+
+#define MAX_WPOOL_SIZE 64
+static pthread_mutex_t wpool_lock = PTHREAD_MUTEX_INITIALIZER;
+static struct work_item *wpool;
+static int wpool_size;
+
+/* work item allocator */
+static struct work_item *alloc_work_item(void)
+{
+       struct work_item *w;
+
+       pthread_mutex_lock(&wpool_lock);
+       if(!wpool) {
+               pthread_mutex_unlock(&wpool_lock);
+               return malloc(sizeof(struct work_item));
+       }
+
+       w = wpool;
+       wpool = wpool->next;
+       --wpool_size;
+       pthread_mutex_unlock(&wpool_lock);
+       return w;
+}
+
+static void free_work_item(struct work_item *w)
+{
+       pthread_mutex_lock(&wpool_lock);
+       if(wpool_size >= MAX_WPOOL_SIZE) {
+               free(w);
+       } else {
+               w->next = wpool;
+               wpool = w;
+               ++wpool_size;
+       }
+       pthread_mutex_unlock(&wpool_lock);
+}
diff --git a/src/tpool.h b/src/tpool.h
new file mode 100644 (file)
index 0000000..0bb4747
--- /dev/null
@@ -0,0 +1,85 @@
+/* worker thread pool based on POSIX threads
+ * author: John Tsiombikas <nuclear@member.fsf.org>
+ * This code is public domain.
+ */
+#ifndef ASSMAN_THREADPOOL_H_
+#define ASSMAN_THREADPOOL_H_
+
+struct thread_pool;
+
+/* type of the function accepted as work or completion callback */
+typedef void (*tpool_callback)(void*);
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* if num_threads == 0, auto-detect how many threads to spawn */
+struct thread_pool *ass_tpool_create(int num_threads);
+void ass_tpool_destroy(struct thread_pool *tpool);
+
+/* optional reference counting interface for thread pool sharing */
+int ass_tpool_addref(struct thread_pool *tpool);
+int ass_tpool_release(struct thread_pool *tpool);      /* will ass_tpool_destroy on nref 0 */
+
+/* if begin_batch is called before an enqueue, the worker threads will not be
+ * signalled to start working until end_batch is called.
+ */
+void ass_tpool_begin_batch(struct thread_pool *tpool);
+void ass_tpool_end_batch(struct thread_pool *tpool);
+
+/* if enqueue is called without calling begin_batch first, it will immediately
+ * wake up the worker threads to start working on the enqueued item
+ */
+int ass_tpool_enqueue(struct thread_pool *tpool, void *data,
+               tpool_callback work_func, tpool_callback done_func);
+/* clear the work queue. does not cancel any currently running jobs */
+void ass_tpool_clear(struct thread_pool *tpool);
+
+/* returns the number of queued work items */
+int ass_tpool_queued_jobs(struct thread_pool *tpool);
+/* returns the number of active (working) threads */
+int ass_tpool_active_jobs(struct thread_pool *tpool);
+/* returns the number of pending jobs, both in queue and active */
+int ass_tpool_pending_jobs(struct thread_pool *tpool);
+
+/* wait for all pending jobs to be completed */
+void ass_tpool_wait(struct thread_pool *tpool);
+/* wait until the pending jobs are down to the target specified
+ * for example, to wait until a single job has been completed:
+ *   ass_tpool_wait_pending(tpool, ass_tpool_pending_jobs(tpool) - 1);
+ * this interface is slightly awkward to avoid race conditions. */
+void ass_tpool_wait_pending(struct thread_pool *tpool, int pending_target);
+/* wait for all pending jobs to be completed for up to "timeout" milliseconds */
+long ass_tpool_timedwait(struct thread_pool *tpool, long timeout);
+
+/* return a file descriptor which can be used to wait for pending job
+ * completion events. A single char is written every time a job completes.
+ * You should empty the pipe every time you receive such an event.
+ *
+ * This is a UNIX-specific call. On windows it does nothing.
+ */
+int ass_tpool_get_wait_fd(struct thread_pool *tpool);
+
+/* return an auto-resetting Event HANDLE which can be used to wait for
+ * pending job completion events.
+ *
+ * This is a Win32-specific call. On UNIX it does nothing.
+ */
+void *ass_tpool_get_wait_handle(struct thread_pool *tpool);
+
+/* When called by a work/done callback, it returns the thread number executing
+ * it. From the main thread it returns -1.
+ */
+int ass_tpool_thread_id(struct thread_pool *tpool);
+
+/* returns the number of processors on the system.
+ * individual cores in multi-core processors are counted as processors.
+ */
+int ass_tpool_num_processors(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ASSMAN_THREADPOOL_H_ */