*.swp
*.a
*.so*
+examples/asscat/asscat
warn = -pedantic -Wall
dbg = -g
opt = -O0
+def = -DBUILD_MOD_URL
+pic = -fPIC
-CFLAGS = $(warn) $(dbg) $(opt) $(inc)
-LDFLAGS = -lcurl
+CFLAGS = $(warn) $(dbg) $(opt) $(pic) $(def) $(inc) -pthread
+LDFLAGS = -pthread -lpthread -lcurl
.PHONY: all
all: $(lib_so) $(lib_a) $(soname) $(devlink)
--- /dev/null
+/* MD4C.C - RSA Data Security, Inc., MD4 message-digest algorithm
+ */
+
+/* Copyright (C) 1990-2, RSA Data Security, Inc. All rights reserved.
+
+ License to copy and use this software is granted provided that it
+ is identified as the "RSA Data Security, Inc. MD4 Message-Digest
+ Algorithm" in all material mentioning or referencing this software
+ or this function.
+
+ License is also granted to make and use derivative works provided
+ that such works are identified as "derived from the RSA Data
+ Security, Inc. MD4 Message-Digest Algorithm" in all material
+ mentioning or referencing the derived work.
+
+ RSA Data Security, Inc. makes no representations concerning either
+ the merchantability of this software or the suitability of this
+ software for any particular purpose. It is provided "as is"
+ without express or implied warranty of any kind.
+
+ These notices must be retained in any copies of any part of this
+ documentation and/or software.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include "md4.h"
+
+/* Constants for MD4Transform routine.
+ */
+#define S11 3
+#define S12 7
+#define S13 11
+#define S14 19
+#define S21 3
+#define S22 5
+#define S23 9
+#define S24 13
+#define S31 3
+#define S32 9
+#define S33 11
+#define S34 15
+
+static void MD4Transform (uint32_t [4], unsigned char [64]);
+static void Encode (unsigned char *, uint32_t *, unsigned int);
+static void Decode (uint32_t *, unsigned char *, unsigned int);
+
+static unsigned char PADDING[64] = {
+ 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+};
+
+/* F, G and H are basic MD4 functions.
+ */
+#define F(x, y, z) (((x) & (y)) | ((~x) & (z)))
+#define G(x, y, z) (((x) & (y)) | ((x) & (z)) | ((y) & (z)))
+#define H(x, y, z) ((x) ^ (y) ^ (z))
+
+/* ROTATE_LEFT rotates x left n bits.
+ */
+#define ROTATE_LEFT(x, n) (((x) << (n)) | ((x) >> (32-(n))))
+
+/* FF, GG and HH are transformations for rounds 1, 2 and 3 */
+/* Rotation is separate from addition to prevent recomputation */
+
+#define FF(a, b, c, d, x, s) { \
+ (a) += F ((b), (c), (d)) + (x); \
+ (a) = ROTATE_LEFT ((a), (s)); \
+ }
+#define GG(a, b, c, d, x, s) { \
+ (a) += G ((b), (c), (d)) + (x) + (uint32_t)0x5a827999; \
+ (a) = ROTATE_LEFT ((a), (s)); \
+ }
+#define HH(a, b, c, d, x, s) { \
+ (a) += H ((b), (c), (d)) + (x) + (uint32_t)0x6ed9eba1; \
+ (a) = ROTATE_LEFT ((a), (s)); \
+ }
+
+/* MD4 initialization. Begins an MD4 operation, writing a new context.
+ */
+void MD4Init (MD4_CTX *context)
+{
+ context->count[0] = context->count[1] = 0;
+
+ /* Load magic initialization constants.
+ */
+ context->state[0] = 0x67452301;
+ context->state[1] = 0xefcdab89;
+ context->state[2] = 0x98badcfe;
+ context->state[3] = 0x10325476;
+}
+
+/* MD4 block update operation. Continues an MD4 message-digest
+ operation, processing another message block, and updating the
+ context.
+ */
+void MD4Update (MD4_CTX *context, unsigned char *input, unsigned int inputLen)
+{
+ unsigned int i, index, partLen;
+
+ /* Compute number of bytes mod 64 */
+ index = (unsigned int)((context->count[0] >> 3) & 0x3F);
+ /* Update number of bits */
+ if ((context->count[0] += ((uint32_t)inputLen << 3))
+ < ((uint32_t)inputLen << 3))
+ context->count[1]++;
+ context->count[1] += ((uint32_t)inputLen >> 29);
+
+ partLen = 64 - index;
+
+ /* Transform as many times as possible.
+ */
+ if (inputLen >= partLen) {
+ memcpy
+ ((unsigned char *)&context->buffer[index], (unsigned char *)input, partLen);
+ MD4Transform (context->state, context->buffer);
+
+ for (i = partLen; i + 63 < inputLen; i += 64)
+ MD4Transform (context->state, &input[i]);
+
+ index = 0;
+ }
+ else
+ i = 0;
+
+ /* Buffer remaining input */
+ memcpy
+ ((unsigned char *)&context->buffer[index], (unsigned char *)&input[i],
+ inputLen-i);
+}
+
+/* MD4 finalization. Ends an MD4 message-digest operation, writing the
+ the message digest and zeroizing the context.
+ */
+void MD4Final (unsigned char digest[16], MD4_CTX *context)
+{
+ unsigned char bits[8];
+ unsigned int index, padLen;
+
+ /* Save number of bits */
+ Encode (bits, context->count, 8);
+
+ /* Pad out to 56 mod 64.
+ */
+ index = (unsigned int)((context->count[0] >> 3) & 0x3f);
+ padLen = (index < 56) ? (56 - index) : (120 - index);
+ MD4Update (context, PADDING, padLen);
+
+ /* Append length (before padding) */
+ MD4Update (context, bits, 8);
+ /* Store state in digest */
+ Encode (digest, context->state, 16);
+
+ /* Zeroize sensitive information.
+ */
+ memset ((unsigned char *)context, 0, sizeof (*context));
+
+}
+
+/* MD4 basic transformation. Transforms state based on block.
+ */
+static void MD4Transform (uint32_t state[4], unsigned char block[64])
+{
+ uint32_t a = state[0], b = state[1], c = state[2], d = state[3], x[16];
+
+ Decode (x, block, 64);
+
+ /* Round 1 */
+ FF (a, b, c, d, x[ 0], S11); /* 1 */
+ FF (d, a, b, c, x[ 1], S12); /* 2 */
+ FF (c, d, a, b, x[ 2], S13); /* 3 */
+ FF (b, c, d, a, x[ 3], S14); /* 4 */
+ FF (a, b, c, d, x[ 4], S11); /* 5 */
+ FF (d, a, b, c, x[ 5], S12); /* 6 */
+ FF (c, d, a, b, x[ 6], S13); /* 7 */
+ FF (b, c, d, a, x[ 7], S14); /* 8 */
+ FF (a, b, c, d, x[ 8], S11); /* 9 */
+ FF (d, a, b, c, x[ 9], S12); /* 10 */
+ FF (c, d, a, b, x[10], S13); /* 11 */
+ FF (b, c, d, a, x[11], S14); /* 12 */
+ FF (a, b, c, d, x[12], S11); /* 13 */
+ FF (d, a, b, c, x[13], S12); /* 14 */
+ FF (c, d, a, b, x[14], S13); /* 15 */
+ FF (b, c, d, a, x[15], S14); /* 16 */
+
+ /* Round 2 */
+ GG (a, b, c, d, x[ 0], S21); /* 17 */
+ GG (d, a, b, c, x[ 4], S22); /* 18 */
+ GG (c, d, a, b, x[ 8], S23); /* 19 */
+ GG (b, c, d, a, x[12], S24); /* 20 */
+ GG (a, b, c, d, x[ 1], S21); /* 21 */
+ GG (d, a, b, c, x[ 5], S22); /* 22 */
+ GG (c, d, a, b, x[ 9], S23); /* 23 */
+ GG (b, c, d, a, x[13], S24); /* 24 */
+ GG (a, b, c, d, x[ 2], S21); /* 25 */
+ GG (d, a, b, c, x[ 6], S22); /* 26 */
+ GG (c, d, a, b, x[10], S23); /* 27 */
+ GG (b, c, d, a, x[14], S24); /* 28 */
+ GG (a, b, c, d, x[ 3], S21); /* 29 */
+ GG (d, a, b, c, x[ 7], S22); /* 30 */
+ GG (c, d, a, b, x[11], S23); /* 31 */
+ GG (b, c, d, a, x[15], S24); /* 32 */
+
+ /* Round 3 */
+ HH (a, b, c, d, x[ 0], S31); /* 33 */
+ HH (d, a, b, c, x[ 8], S32); /* 34 */
+ HH (c, d, a, b, x[ 4], S33); /* 35 */
+ HH (b, c, d, a, x[12], S34); /* 36 */
+ HH (a, b, c, d, x[ 2], S31); /* 37 */
+ HH (d, a, b, c, x[10], S32); /* 38 */
+ HH (c, d, a, b, x[ 6], S33); /* 39 */
+ HH (b, c, d, a, x[14], S34); /* 40 */
+ HH (a, b, c, d, x[ 1], S31); /* 41 */
+ HH (d, a, b, c, x[ 9], S32); /* 42 */
+ HH (c, d, a, b, x[ 5], S33); /* 43 */
+ HH (b, c, d, a, x[13], S34); /* 44 */
+ HH (a, b, c, d, x[ 3], S31); /* 45 */
+ HH (d, a, b, c, x[11], S32); /* 46 */
+ HH (c, d, a, b, x[ 7], S33); /* 47 */
+ HH (b, c, d, a, x[15], S34); /* 48 */
+
+ state[0] += a;
+ state[1] += b;
+ state[2] += c;
+ state[3] += d;
+
+ /* Zeroize sensitive information.
+ */
+ memset ((unsigned char *)x, 0, sizeof (x));
+}
+
+/* Encodes input (uint32_t) into output (unsigned char). Assumes len is
+ a multiple of 4.
+ */
+static void Encode (unsigned char *output, uint32_t *input, unsigned int len)
+{
+ unsigned int i, j;
+
+ for (i = 0, j = 0; j < len; i++, j += 4) {
+ output[j] = (unsigned char)(input[i] & 0xff);
+ output[j+1] = (unsigned char)((input[i] >> 8) & 0xff);
+ output[j+2] = (unsigned char)((input[i] >> 16) & 0xff);
+ output[j+3] = (unsigned char)((input[i] >> 24) & 0xff);
+ }
+}
+
+/* Decodes input (unsigned char) into output (uint32_t). Assumes len is
+ a multiple of 4.
+
+ */
+static void Decode (uint32_t *output, unsigned char *input, unsigned int len)
+{
+ unsigned int i, j;
+
+ for (i = 0, j = 0; j < len; i++, j += 4)
+ output[i] = ((uint32_t)input[j]) | (((uint32_t)input[j+1]) << 8) |
+ (((uint32_t)input[j+2]) << 16) | (((uint32_t)input[j+3]) << 24);
+}
--- /dev/null
+/* Copyright (C) 1991-2, RSA Data Security, Inc. Created 1991. All
+ rights reserved.
+
+ License to copy and use this software is granted provided that it
+ is identified as the "RSA Data Security, Inc. MD4 Message-Digest
+ Algorithm" in all material mentioning or referencing this software
+ or this function.
+
+ License is also granted to make and use derivative works provided
+ that such works are identified as "derived from the RSA Data
+ Security, Inc. MD4 Message-Digest Algorithm" in all material
+ mentioning or referencing the derived work.
+
+ RSA Data Security, Inc. makes no representations concerning either
+ the merchantability of this software or the suitability of this
+ software for any particular purpose. It is provided "as is"
+ without express or implied warranty of any kind.
+
+ These notices must be retained in any copies of any part of this
+ documentation and/or software.
+ */
+
+#include <inttypes.h>
+
+/* MD4 context. */
+typedef struct {
+ uint32_t state[4]; /* state (ABCD) */
+ uint32_t count[2]; /* number of bits, modulo 2^64 (lsb first) */
+ unsigned char buffer[64]; /* input buffer */
+} MD4_CTX;
+
+void MD4Init(MD4_CTX *);
+void MD4Update(MD4_CTX *, unsigned char *, unsigned int);
+void MD4Final(unsigned char [16], MD4_CTX *);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <errno.h>
+#include "assman_impl.h"
+#include "tpool.h"
+#include "md4.h"
#ifdef BUILD_MOD_URL
+#include <pthread.h>
#include <curl/curl.h>
+enum {
+ DL_UNKNOWN,
+ DL_STARTED,
+ DL_ERROR,
+ DL_DONE
+};
+
+struct file_info {
+ char *url;
+ char *cache_fname;
+
+ FILE *cache_file;
+
+ /* fopen-thread waits until the state becomes known (request starts transmitting or fails) */
+ int state;
+ pthread_cond_t state_cond;
+ pthread_mutex_t state_mutex;
+};
+
+static void *fop_open(const char *fname, void *udata);
+static void fop_close(void *fp, void *udata);
+static long fop_seek(void *fp, long offs, int whence, void *udata);
+static long fop_read(void *fp, void *buf, long size, void *udata);
+
+static void exit_cleanup(void);
+static void download(void *data);
+static size_t recv_callback(char *ptr, size_t size, size_t nmemb, void *udata);
static const char *get_temp_dir(void);
static char *tmpdir, *cachedir;
+static struct thread_pool *tpool;
+static CURL **curl;
struct ass_fileops *ass_alloc_url(const char *url)
{
static int done_init;
struct ass_fileops *fop;
+ int i;
if(!done_init) {
curl_global_init(CURL_GLOBAL_ALL);
- atexit(curl_global_cleanup);
- done_init = 1;
+ atexit(exit_cleanup);
if(!*ass_mod_url_cachedir) {
strcpy(ass_mod_url_cachedir, "assman_cache");
}
- tmpdir = get_temp_dir();
+ tmpdir = (char*)get_temp_dir();
if(!(cachedir = malloc(strlen(ass_mod_url_cachedir) + strlen(tmpdir) + 2))) {
fprintf(stderr, "assman: failed to allocate cachedir path buffer\n");
- return 0;
+ goto init_failed;
}
sprintf(cachedir, "%s/%s", tmpdir, ass_mod_url_cachedir);
+
+ if(ass_mod_url_max_threads <= 0) {
+ ass_mod_url_max_threads = 8;
+ }
+
+ if(!(curl = calloc(ass_mod_url_max_threads, sizeof *curl))) {
+ perror("assman: failed to allocate curl context table");
+ goto init_failed;
+ }
+ for(i=0; i<ass_mod_url_max_threads; i++) {
+ if(!(curl[i] = curl_easy_init())) {
+ goto init_failed;
+ }
+ curl_easy_setopt(curl[i], CURLOPT_WRITEFUNCTION, recv_callback);
+ }
+
+ if(!(tpool = ass_tpool_create(ass_mod_url_max_threads))) {
+ fprintf(stderr, "assman: failed to create thread pool\n");
+ goto init_failed;
+ }
+
+ done_init = 1;
}
if(!(fop = malloc(sizeof *fop))) {
return 0;
}
+ if(!(fop->udata = malloc(strlen(url) + 1))) {
+ free(fop);
+ return 0;
+ }
+ strcpy(fop->udata, url);
- fop->udata = 0;
fop->open = fop_open;
fop->close = fop_close;
fop->seek = fop_seek;
fop->read = fop_read;
return fop;
+
+init_failed:
+ free(cachedir);
+ if(curl) {
+ for(i=0; i<ass_mod_url_max_threads; i++) {
+ if(curl[i]) {
+ curl_easy_cleanup(curl[i]);
+ }
+ }
+ free(curl);
+ }
+ return 0;
}
+static void exit_cleanup(void)
+{
+ int i;
+
+ if(tpool) {
+ ass_tpool_destroy(tpool);
+ }
+ if(curl) {
+ for(i=0; i<ass_mod_url_max_threads; i++) {
+ if(curl[i]) {
+ curl_easy_cleanup(curl[i]);
+ }
+ }
+ free(curl);
+ }
+ curl_global_cleanup();
+}
+
+
void ass_free_url(struct ass_fileops *fop)
{
}
+static char *cache_filename(const char *fname, const char *url_prefix)
+{
+ MD4_CTX md4ctx;
+ unsigned char sum[16];
+ char sumstr[33];
+ char *resfname;
+ int i;
+ int fname_len = strlen(fname);
+ int prefix_len = strlen(url_prefix);
+ int url_len = fname_len + prefix_len + 1;
+
+ char *url = alloca(url_len + 1);
+ sprintf(url, "%s/%s", url_prefix, fname);
+
+ MD4Init(&md4ctx);
+ MD4Update(&md4ctx, (unsigned char*)url, url_len);
+ MD4Final((unsigned char*)sum, &md4ctx);
+
+ for(i=0; i<16; i++) {
+ sprintf(sumstr + i * 2, "%x", (unsigned int)sum[i]);
+ }
+ sumstr[32] = 0;
+
+ prefix_len = strlen(cachedir);
+ if(!(resfname = malloc(prefix_len + fname_len + 20))) {
+ return 0;
+ }
+ sprintf(resfname, "%s/%s-%s", cachedir, fname, sumstr);
+ return resfname;
+}
+
static void *fop_open(const char *fname, void *udata)
{
- return 0; /* TODO */
+ struct file_info *file;
+ int state;
+
+ if(!(file = malloc(sizeof *file))) {
+ ass_errno = ENOMEM;
+ return 0;
+ }
+ if(!(file->cache_fname = cache_filename(fname, udata))) {
+ free(file);
+ ass_errno = ENOMEM;
+ return 0;
+ }
+ if(!(file->cache_file = fopen(file->cache_fname, "wb"))) {
+ fprintf(stderr, "assman: mod_url: failed to open cache file (%s) for writing: %s\n",
+ file->cache_fname, strerror(errno));
+ ass_errno = errno;
+ free(file->cache_fname);
+ free(file);
+ return 0;
+ }
+
+ file->state = DL_UNKNOWN;
+ pthread_mutex_init(&file->state_mutex, 0);
+ pthread_cond_init(&file->state_cond, 0);
+
+ ass_tpool_enqueue(tpool, file, download, 0);
+
+ /* wait until the file changes state */
+ pthread_mutex_lock(&file->state_mutex);
+ while(file->state == DL_UNKNOWN) {
+ pthread_cond_wait(&file->state_cond, &file->state_mutex);
+ }
+ state = file->state;
+ pthread_mutex_unlock(&file->state_mutex);
+
+ if(state == DL_ERROR) {
+ /* the worker stopped, so we can safely cleanup and return error */
+ fclose(file->cache_file);
+ remove(file->cache_fname);
+ free(file->cache_fname);
+ pthread_cond_destroy(&file->state_cond);
+ pthread_mutex_destroy(&file->state_mutex);
+ free(file);
+ ass_errno = ENOENT; /* TODO: differentiate between 403 and 404 */
+ return 0;
+ }
+ return file;
+}
+
+static void wait_done(struct file_info *file)
+{
+ pthread_mutex_lock(&file->state_mutex);
+ while(file->state != DL_DONE && file->state != DL_ERROR) {
+ pthread_cond_wait(&file->state_cond, &file->state_mutex);
+ }
+ pthread_mutex_unlock(&file->state_mutex);
}
static void fop_close(void *fp, void *udata)
{
+ struct file_info *file = fp;
+
+ wait_done(file); /* TODO: stop download instead of waiting to finish */
+
+ fclose(file->cache_file);
+ if(file->state == DL_ERROR) remove(file->cache_fname);
+ free(file->cache_fname);
+ pthread_cond_destroy(&file->state_cond);
+ pthread_mutex_destroy(&file->state_mutex);
+ free(file);
}
static long fop_seek(void *fp, long offs, int whence, void *udata)
{
- return 0;
+ struct file_info *file = fp;
+ wait_done(file);
+
+ fseek(file->cache_file, offs, whence);
+ return ftell(file->cache_file);
}
static long fop_read(void *fp, void *buf, long size, void *udata)
{
- return 0;
+ struct file_info *file = fp;
+ wait_done(file);
+
+ return fread(buf, 1, size, file->cache_file);
}
-#else
+/* this is the function called by the worker threads to perform the download
+ * signal state changes, and prepare the cache file for reading
+ */
+static void download(void *data)
+{
+ int tid, res;
+ struct file_info *file = data;
+
+ tid = ass_tpool_thread_id(tpool);
+
+ curl_easy_setopt(curl[tid], CURLOPT_URL, file->url);
+ curl_easy_setopt(curl[tid], CURLOPT_WRITEDATA, file);
+ res = curl_easy_perform(curl[tid]);
+
+ pthread_mutex_lock(&file->state_mutex);
+ if(res == CURLE_OK) {
+ file->state = DL_DONE;
+ fclose(file->cache_file);
+ if(!(file->cache_file = fopen(file->cache_fname, "rb"))) {
+ fprintf(stderr, "assman: failed to reopen cache file (%s) for reading: %s\n",
+ file->cache_fname, strerror(errno));
+ file->state = DL_ERROR;
+ }
+ } else {
+ file->state = DL_ERROR;
+ }
+ pthread_cond_broadcast(&file->state_cond);
+ pthread_mutex_unlock(&file->state_mutex);
+}
+
+/* this function is called by curl to pass along downloaded data chunks */
+static size_t recv_callback(char *ptr, size_t size, size_t count, void *udata)
+{
+ struct file_info *file = udata;
+
+ pthread_mutex_lock(&file->state_mutex);
+ if(file->state == DL_UNKNOWN) {
+ file->state = DL_STARTED;
+ pthread_cond_broadcast(&file->state_cond);
+ }
+ pthread_mutex_unlock(&file->state_mutex);
+
+ return fwrite(ptr, size, count, file->cache_file);
+}
+
+#ifdef WIN32
+#include <windows.h>
+
+static const char *get_temp_dir(void)
+{
+ static char buf[MAX_PATH + 1];
+ GetTempPathA(MAX_PATH + 1, buf);
+ return buf;
+}
+#else /* UNIX */
+static const char *get_temp_dir(void)
+{
+ char *env = getenv("TMPDIR");
+ return env ? env : "/tmp";
+}
+#endif
+
+
+
+#else /* don't build mod_url */
struct ass_fileops *ass_alloc_url(const char *url)
{
fprintf(stderr, "assman: compiled without URL asset source support\n");
--- /dev/null
+/* worker thread pool based on POSIX threads
+ * author: John Tsiombikas <nuclear@member.fsf.org>
+ * This code is public domain.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <pthread.h>
+#include "tpool.h"
+
+#if defined(__APPLE__) && defined(__MACH__)
+# ifndef __unix__
+# define __unix__ 1
+# endif /* unix */
+# ifndef __bsd__
+# define __bsd__ 1
+# endif /* bsd */
+#endif /* apple */
+
+#if defined(unix) || defined(__unix__)
+#include <unistd.h>
+#include <sys/time.h>
+
+# ifdef __bsd__
+# include <sys/sysctl.h>
+# endif
+#endif
+
+#if defined(WIN32) || defined(__WIN32__)
+#include <windows.h>
+#endif
+
+
+struct work_item {
+ void *data;
+ tpool_callback work, done;
+ struct work_item *next;
+};
+
+struct thread_data {
+ int id;
+ struct thread_pool *pool;
+};
+
+struct thread_pool {
+ pthread_t *threads;
+ struct thread_data *tdata;
+ int num_threads;
+ pthread_key_t idkey;
+
+ int qsize;
+ struct work_item *workq, *workq_tail;
+ pthread_mutex_t workq_mutex;
+ pthread_cond_t workq_condvar;
+
+ int nactive; /* number of active workers (not sleeping) */
+
+ pthread_cond_t done_condvar;
+
+ int should_quit;
+ int in_batch;
+
+ int nref; /* reference count */
+
+#if defined(WIN32) || defined(__WIN32__)
+ HANDLE wait_event;
+#else
+ int wait_pipe[2];
+#endif
+};
+
+static void *thread_func(void *args);
+static void send_done_event(struct thread_pool *tpool);
+
+static struct work_item *alloc_work_item(void);
+static void free_work_item(struct work_item *w);
+
+
+struct thread_pool *ass_tpool_create(int num_threads)
+{
+ int i;
+ struct thread_pool *tpool;
+
+ if(!(tpool = calloc(1, sizeof *tpool))) {
+ return 0;
+ }
+ pthread_mutex_init(&tpool->workq_mutex, 0);
+ pthread_cond_init(&tpool->workq_condvar, 0);
+ pthread_cond_init(&tpool->done_condvar, 0);
+ pthread_key_create(&tpool->idkey, 0);
+
+ pthread_setspecific(tpool->idkey, (void*)0xffffffff);
+
+#if !defined(WIN32) && !defined(__WIN32__)
+ tpool->wait_pipe[0] = tpool->wait_pipe[1] = -1;
+#endif
+
+ if(num_threads <= 0) {
+ num_threads = ass_tpool_num_processors();
+ }
+ tpool->num_threads = num_threads;
+
+ if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) {
+ free(tpool);
+ return 0;
+ }
+ if(!(tpool->tdata = malloc(num_threads * sizeof *tpool->tdata))) {
+ free(tpool->threads);
+ free(tpool);
+ return 0;
+ }
+
+ for(i=0; i<num_threads; i++) {
+ tpool->tdata[i].id = i;
+ tpool->tdata[i].pool = tpool;
+
+ if(pthread_create(tpool->threads + i, 0, thread_func, tpool->tdata + i) == -1) {
+ /*tpool->threads[i] = 0;*/
+ ass_tpool_destroy(tpool);
+ return 0;
+ }
+ }
+ return tpool;
+}
+
+void ass_tpool_destroy(struct thread_pool *tpool)
+{
+ int i;
+ if(!tpool) return;
+
+ ass_tpool_clear(tpool);
+ tpool->should_quit = 1;
+
+ pthread_cond_broadcast(&tpool->workq_condvar);
+
+ if(tpool->threads) {
+ printf("thread_pool: waiting for %d worker threads to stop ", tpool->num_threads);
+ fflush(stdout);
+
+ for(i=0; i<tpool->num_threads; i++) {
+ pthread_join(tpool->threads[i], 0);
+ putchar('.');
+ fflush(stdout);
+ }
+ putchar('\n');
+ free(tpool->threads);
+ }
+ free(tpool->tdata);
+
+ /* also wake up anyone waiting on the wait* calls */
+ tpool->nactive = 0;
+ pthread_cond_broadcast(&tpool->done_condvar);
+ send_done_event(tpool);
+
+ pthread_mutex_destroy(&tpool->workq_mutex);
+ pthread_cond_destroy(&tpool->workq_condvar);
+ pthread_cond_destroy(&tpool->done_condvar);
+ pthread_key_delete(tpool->idkey);
+
+#if defined(WIN32) || defined(__WIN32__)
+ if(tpool->wait_event) {
+ CloseHandle(tpool->wait_event);
+ }
+#else
+ if(tpool->wait_pipe[0] >= 0) {
+ close(tpool->wait_pipe[0]);
+ close(tpool->wait_pipe[1]);
+ }
+#endif
+}
+
+int ass_tpool_addref(struct thread_pool *tpool)
+{
+ return ++tpool->nref;
+}
+
+int ass_tpool_release(struct thread_pool *tpool)
+{
+ if(--tpool->nref <= 0) {
+ ass_tpool_destroy(tpool);
+ return 0;
+ }
+ return tpool->nref;
+}
+
+void ass_tpool_begin_batch(struct thread_pool *tpool)
+{
+ tpool->in_batch = 1;
+}
+
+void ass_tpool_end_batch(struct thread_pool *tpool)
+{
+ tpool->in_batch = 0;
+ pthread_cond_broadcast(&tpool->workq_condvar);
+}
+
+int ass_tpool_enqueue(struct thread_pool *tpool, void *data,
+ tpool_callback work_func, tpool_callback done_func)
+{
+ struct work_item *job;
+
+ if(!(job = alloc_work_item())) {
+ return -1;
+ }
+ job->work = work_func;
+ job->done = done_func;
+ job->data = data;
+ job->next = 0;
+
+ pthread_mutex_lock(&tpool->workq_mutex);
+ if(tpool->workq) {
+ tpool->workq_tail->next = job;
+ tpool->workq_tail = job;
+ } else {
+ tpool->workq = tpool->workq_tail = job;
+ }
+ ++tpool->qsize;
+ pthread_mutex_unlock(&tpool->workq_mutex);
+
+ if(!tpool->in_batch) {
+ pthread_cond_broadcast(&tpool->workq_condvar);
+ }
+ return 0;
+}
+
+void ass_tpool_clear(struct thread_pool *tpool)
+{
+ pthread_mutex_lock(&tpool->workq_mutex);
+ while(tpool->workq) {
+ void *tmp = tpool->workq;
+ tpool->workq = tpool->workq->next;
+ free(tmp);
+ }
+ tpool->workq = tpool->workq_tail = 0;
+ tpool->qsize = 0;
+ pthread_mutex_unlock(&tpool->workq_mutex);
+}
+
+int ass_tpool_queued_jobs(struct thread_pool *tpool)
+{
+ int res;
+ pthread_mutex_lock(&tpool->workq_mutex);
+ res = tpool->qsize;
+ pthread_mutex_unlock(&tpool->workq_mutex);
+ return res;
+}
+
+int ass_tpool_active_jobs(struct thread_pool *tpool)
+{
+ int res;
+ pthread_mutex_lock(&tpool->workq_mutex);
+ res = tpool->nactive;
+ pthread_mutex_unlock(&tpool->workq_mutex);
+ return res;
+}
+
+int ass_tpool_pending_jobs(struct thread_pool *tpool)
+{
+ int res;
+ pthread_mutex_lock(&tpool->workq_mutex);
+ res = tpool->qsize + tpool->nactive;
+ pthread_mutex_unlock(&tpool->workq_mutex);
+ return res;
+}
+
+void ass_tpool_wait(struct thread_pool *tpool)
+{
+ pthread_mutex_lock(&tpool->workq_mutex);
+ while(tpool->nactive || tpool->qsize) {
+ pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
+ }
+ pthread_mutex_unlock(&tpool->workq_mutex);
+}
+
+void ass_tpool_wait_pending(struct thread_pool *tpool, int pending_target)
+{
+ pthread_mutex_lock(&tpool->workq_mutex);
+ while(tpool->qsize + tpool->nactive > pending_target) {
+ pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex);
+ }
+ pthread_mutex_unlock(&tpool->workq_mutex);
+}
+
+#if defined(WIN32) || defined(__WIN32__)
+long ass_tpool_timedwait(struct thread_pool *tpool, long timeout)
+{
+ fprintf(stderr, "tpool_timedwait currently unimplemented on windows\n");
+ abort();
+ return 0;
+}
+
+/* TODO: actually does this work with MinGW ? */
+int ass_tpool_get_wait_fd(struct thread_pool *tpool)
+{
+ static int once;
+ if(!once) {
+ once = 1;
+ fprintf(stderr, "warning: ass_tpool_get_wait_fd call on Windows does nothing\n");
+ }
+ return 0;
+}
+
+void *ass_tpool_get_wait_handle(struct thread_pool *tpool)
+{
+ if(!tpool->wait_event) {
+ if(!(tpool->wait_event = CreateEvent(0, 0, 0, 0))) {
+ return 0;
+ }
+ }
+ return tpool->wait_event;
+}
+
+static void send_done_event(struct thread_pool *tpool)
+{
+ if(tpool->wait_event) {
+ SetEvent(tpool->wait_event);
+ }
+}
+
+#else /* UNIX */
+
+long ass_tpool_timedwait(struct thread_pool *tpool, long timeout)
+{
+ struct timespec tout_ts;
+ struct timeval tv0, tv;
+ gettimeofday(&tv0, 0);
+
+ long sec = timeout / 1000;
+ tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000;
+ tout_ts.tv_sec = tv0.tv_sec + sec;
+
+ pthread_mutex_lock(&tpool->workq_mutex);
+ while(tpool->nactive || tpool->qsize) {
+ if(pthread_cond_timedwait(&tpool->done_condvar,
+ &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) {
+ break;
+ }
+ }
+ pthread_mutex_unlock(&tpool->workq_mutex);
+
+ gettimeofday(&tv, 0);
+ return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000;
+}
+
+int ass_tpool_get_wait_fd(struct thread_pool *tpool)
+{
+ if(tpool->wait_pipe[0] < 0) {
+ if(pipe(tpool->wait_pipe) == -1) {
+ return -1;
+ }
+ }
+ return tpool->wait_pipe[0];
+}
+
+void *ass_tpool_get_wait_handle(struct thread_pool *tpool)
+{
+ static int once;
+ if(!once) {
+ once = 1;
+ fprintf(stderr, "warning: ass_tpool_get_wait_handle call on UNIX does nothing\n");
+ }
+ return 0;
+}
+
+static void send_done_event(struct thread_pool *tpool)
+{
+ if(tpool->wait_pipe[1] >= 0) {
+ write(tpool->wait_pipe[1], tpool, 1);
+ }
+}
+#endif /* WIN32/UNIX */
+
+static void *thread_func(void *args)
+{
+ struct thread_data *tdata = args;
+ struct thread_pool *tpool = tdata->pool;
+
+ pthread_setspecific(tpool->idkey, (void*)(intptr_t)tdata->id);
+
+ pthread_mutex_lock(&tpool->workq_mutex);
+ while(!tpool->should_quit) {
+ if(!tpool->workq) {
+ pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex);
+ if(tpool->should_quit) break;
+ }
+
+ while(!tpool->should_quit && tpool->workq) {
+ /* grab the first job */
+ struct work_item *job = tpool->workq;
+ tpool->workq = tpool->workq->next;
+ if(!tpool->workq)
+ tpool->workq_tail = 0;
+ ++tpool->nactive;
+ --tpool->qsize;
+ pthread_mutex_unlock(&tpool->workq_mutex);
+
+ /* do the job */
+ job->work(job->data);
+ if(job->done) {
+ job->done(job->data);
+ }
+ free_work_item(job);
+
+ pthread_mutex_lock(&tpool->workq_mutex);
+ /* notify everyone interested that we're done with this job */
+ pthread_cond_broadcast(&tpool->done_condvar);
+ send_done_event(tpool);
+ --tpool->nactive;
+ }
+ }
+ pthread_mutex_unlock(&tpool->workq_mutex);
+
+ return 0;
+}
+
+
+int ass_tpool_thread_id(struct thread_pool *tpool)
+{
+ int id = (intptr_t)pthread_getspecific(tpool->idkey);
+ if(id >= tpool->num_threads) {
+ return -1;
+ }
+ return id;
+}
+
+
+/* The following highly platform-specific code detects the number
+ * of processors available in the system. It's used by the thread pool
+ * to autodetect how many threads to spawn.
+ * Currently works on: Linux, BSD, Darwin, and Windows.
+ */
+int ass_tpool_num_processors(void)
+{
+#if defined(unix) || defined(__unix__)
+# if defined(__bsd__)
+ /* BSD systems provide the num.processors through sysctl */
+ int num, mib[] = {CTL_HW, HW_NCPU};
+ size_t len = sizeof num;
+
+ sysctl(mib, 2, &num, &len, 0, 0);
+ return num;
+
+# elif defined(__sgi)
+ /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
+ return sysconf(_SC_NPROC_ONLN);
+# else
+ /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
+ return sysconf(_SC_NPROCESSORS_ONLN);
+# endif /* bsd/sgi/other */
+
+#elif defined(WIN32) || defined(__WIN32__)
+ /* under windows we need to call GetSystemInfo */
+ SYSTEM_INFO info;
+ GetSystemInfo(&info);
+ return info.dwNumberOfProcessors;
+#endif
+}
+
+#define MAX_WPOOL_SIZE 64
+static pthread_mutex_t wpool_lock = PTHREAD_MUTEX_INITIALIZER;
+static struct work_item *wpool;
+static int wpool_size;
+
+/* work item allocator */
+static struct work_item *alloc_work_item(void)
+{
+ struct work_item *w;
+
+ pthread_mutex_lock(&wpool_lock);
+ if(!wpool) {
+ pthread_mutex_unlock(&wpool_lock);
+ return malloc(sizeof(struct work_item));
+ }
+
+ w = wpool;
+ wpool = wpool->next;
+ --wpool_size;
+ pthread_mutex_unlock(&wpool_lock);
+ return w;
+}
+
+static void free_work_item(struct work_item *w)
+{
+ pthread_mutex_lock(&wpool_lock);
+ if(wpool_size >= MAX_WPOOL_SIZE) {
+ free(w);
+ } else {
+ w->next = wpool;
+ wpool = w;
+ ++wpool_size;
+ }
+ pthread_mutex_unlock(&wpool_lock);
+}
--- /dev/null
+/* worker thread pool based on POSIX threads
+ * author: John Tsiombikas <nuclear@member.fsf.org>
+ * This code is public domain.
+ */
+#ifndef ASSMAN_THREADPOOL_H_
+#define ASSMAN_THREADPOOL_H_
+
+struct thread_pool;
+
+/* type of the function accepted as work or completion callback */
+typedef void (*tpool_callback)(void*);
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* if num_threads == 0, auto-detect how many threads to spawn */
+struct thread_pool *ass_tpool_create(int num_threads);
+void ass_tpool_destroy(struct thread_pool *tpool);
+
+/* optional reference counting interface for thread pool sharing */
+int ass_tpool_addref(struct thread_pool *tpool);
+int ass_tpool_release(struct thread_pool *tpool); /* will ass_tpool_destroy on nref 0 */
+
+/* if begin_batch is called before an enqueue, the worker threads will not be
+ * signalled to start working until end_batch is called.
+ */
+void ass_tpool_begin_batch(struct thread_pool *tpool);
+void ass_tpool_end_batch(struct thread_pool *tpool);
+
+/* if enqueue is called without calling begin_batch first, it will immediately
+ * wake up the worker threads to start working on the enqueued item
+ */
+int ass_tpool_enqueue(struct thread_pool *tpool, void *data,
+ tpool_callback work_func, tpool_callback done_func);
+/* clear the work queue. does not cancel any currently running jobs */
+void ass_tpool_clear(struct thread_pool *tpool);
+
+/* returns the number of queued work items */
+int ass_tpool_queued_jobs(struct thread_pool *tpool);
+/* returns the number of active (working) threads */
+int ass_tpool_active_jobs(struct thread_pool *tpool);
+/* returns the number of pending jobs, both in queue and active */
+int ass_tpool_pending_jobs(struct thread_pool *tpool);
+
+/* wait for all pending jobs to be completed */
+void ass_tpool_wait(struct thread_pool *tpool);
+/* wait until the pending jobs are down to the target specified
+ * for example, to wait until a single job has been completed:
+ * ass_tpool_wait_pending(tpool, ass_tpool_pending_jobs(tpool) - 1);
+ * this interface is slightly awkward to avoid race conditions. */
+void ass_tpool_wait_pending(struct thread_pool *tpool, int pending_target);
+/* wait for all pending jobs to be completed for up to "timeout" milliseconds */
+long ass_tpool_timedwait(struct thread_pool *tpool, long timeout);
+
+/* return a file descriptor which can be used to wait for pending job
+ * completion events. A single char is written every time a job completes.
+ * You should empty the pipe every time you receive such an event.
+ *
+ * This is a UNIX-specific call. On windows it does nothing.
+ */
+int ass_tpool_get_wait_fd(struct thread_pool *tpool);
+
+/* return an auto-resetting Event HANDLE which can be used to wait for
+ * pending job completion events.
+ *
+ * This is a Win32-specific call. On UNIX it does nothing.
+ */
+void *ass_tpool_get_wait_handle(struct thread_pool *tpool);
+
+/* When called by a work/done callback, it returns the thread number executing
+ * it. From the main thread it returns -1.
+ */
+int ass_tpool_thread_id(struct thread_pool *tpool);
+
+/* returns the number of processors on the system.
+ * individual cores in multi-core processors are counted as processors.
+ */
+int ass_tpool_num_processors(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ASSMAN_THREADPOOL_H_ */