From 47f73ee9117686e9d3d0b5bf6c664215dfe1bf8d Mon Sep 17 00:00:00 2001 From: John Tsiombikas Date: Wed, 26 Sep 2018 08:56:07 +0300 Subject: [PATCH] mod_url progress --- .gitignore | 1 + Makefile | 6 +- src/md4.c | 259 ++++++++++++++++++++++++++++++ src/md4.h | 34 ++++ src/mod_url.c | 276 ++++++++++++++++++++++++++++++-- src/tpool.c | 493 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/tpool.h | 85 ++++++++++ 7 files changed, 1143 insertions(+), 11 deletions(-) create mode 100644 src/md4.c create mode 100644 src/md4.h create mode 100644 src/tpool.c create mode 100644 src/tpool.h diff --git a/.gitignore b/.gitignore index 17859ab..6592a48 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ *.swp *.a *.so* +examples/asscat/asscat diff --git a/Makefile b/Makefile index ab2a248..c3abbe7 100644 --- a/Makefile +++ b/Makefile @@ -15,9 +15,11 @@ shared = -shared -Wl,-soname,$(soname) warn = -pedantic -Wall dbg = -g opt = -O0 +def = -DBUILD_MOD_URL +pic = -fPIC -CFLAGS = $(warn) $(dbg) $(opt) $(inc) -LDFLAGS = -lcurl +CFLAGS = $(warn) $(dbg) $(opt) $(pic) $(def) $(inc) -pthread +LDFLAGS = -pthread -lpthread -lcurl .PHONY: all all: $(lib_so) $(lib_a) $(soname) $(devlink) diff --git a/src/md4.c b/src/md4.c new file mode 100644 index 0000000..e135d1b --- /dev/null +++ b/src/md4.c @@ -0,0 +1,259 @@ +/* MD4C.C - RSA Data Security, Inc., MD4 message-digest algorithm + */ + +/* Copyright (C) 1990-2, RSA Data Security, Inc. All rights reserved. + + License to copy and use this software is granted provided that it + is identified as the "RSA Data Security, Inc. MD4 Message-Digest + Algorithm" in all material mentioning or referencing this software + or this function. + + License is also granted to make and use derivative works provided + that such works are identified as "derived from the RSA Data + Security, Inc. MD4 Message-Digest Algorithm" in all material + mentioning or referencing the derived work. + + RSA Data Security, Inc. makes no representations concerning either + the merchantability of this software or the suitability of this + software for any particular purpose. It is provided "as is" + without express or implied warranty of any kind. + + These notices must be retained in any copies of any part of this + documentation and/or software. + */ + +#include +#include +#include "md4.h" + +/* Constants for MD4Transform routine. + */ +#define S11 3 +#define S12 7 +#define S13 11 +#define S14 19 +#define S21 3 +#define S22 5 +#define S23 9 +#define S24 13 +#define S31 3 +#define S32 9 +#define S33 11 +#define S34 15 + +static void MD4Transform (uint32_t [4], unsigned char [64]); +static void Encode (unsigned char *, uint32_t *, unsigned int); +static void Decode (uint32_t *, unsigned char *, unsigned int); + +static unsigned char PADDING[64] = { + 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 +}; + +/* F, G and H are basic MD4 functions. + */ +#define F(x, y, z) (((x) & (y)) | ((~x) & (z))) +#define G(x, y, z) (((x) & (y)) | ((x) & (z)) | ((y) & (z))) +#define H(x, y, z) ((x) ^ (y) ^ (z)) + +/* ROTATE_LEFT rotates x left n bits. + */ +#define ROTATE_LEFT(x, n) (((x) << (n)) | ((x) >> (32-(n)))) + +/* FF, GG and HH are transformations for rounds 1, 2 and 3 */ +/* Rotation is separate from addition to prevent recomputation */ + +#define FF(a, b, c, d, x, s) { \ + (a) += F ((b), (c), (d)) + (x); \ + (a) = ROTATE_LEFT ((a), (s)); \ + } +#define GG(a, b, c, d, x, s) { \ + (a) += G ((b), (c), (d)) + (x) + (uint32_t)0x5a827999; \ + (a) = ROTATE_LEFT ((a), (s)); \ + } +#define HH(a, b, c, d, x, s) { \ + (a) += H ((b), (c), (d)) + (x) + (uint32_t)0x6ed9eba1; \ + (a) = ROTATE_LEFT ((a), (s)); \ + } + +/* MD4 initialization. Begins an MD4 operation, writing a new context. + */ +void MD4Init (MD4_CTX *context) +{ + context->count[0] = context->count[1] = 0; + + /* Load magic initialization constants. + */ + context->state[0] = 0x67452301; + context->state[1] = 0xefcdab89; + context->state[2] = 0x98badcfe; + context->state[3] = 0x10325476; +} + +/* MD4 block update operation. Continues an MD4 message-digest + operation, processing another message block, and updating the + context. + */ +void MD4Update (MD4_CTX *context, unsigned char *input, unsigned int inputLen) +{ + unsigned int i, index, partLen; + + /* Compute number of bytes mod 64 */ + index = (unsigned int)((context->count[0] >> 3) & 0x3F); + /* Update number of bits */ + if ((context->count[0] += ((uint32_t)inputLen << 3)) + < ((uint32_t)inputLen << 3)) + context->count[1]++; + context->count[1] += ((uint32_t)inputLen >> 29); + + partLen = 64 - index; + + /* Transform as many times as possible. + */ + if (inputLen >= partLen) { + memcpy + ((unsigned char *)&context->buffer[index], (unsigned char *)input, partLen); + MD4Transform (context->state, context->buffer); + + for (i = partLen; i + 63 < inputLen; i += 64) + MD4Transform (context->state, &input[i]); + + index = 0; + } + else + i = 0; + + /* Buffer remaining input */ + memcpy + ((unsigned char *)&context->buffer[index], (unsigned char *)&input[i], + inputLen-i); +} + +/* MD4 finalization. Ends an MD4 message-digest operation, writing the + the message digest and zeroizing the context. + */ +void MD4Final (unsigned char digest[16], MD4_CTX *context) +{ + unsigned char bits[8]; + unsigned int index, padLen; + + /* Save number of bits */ + Encode (bits, context->count, 8); + + /* Pad out to 56 mod 64. + */ + index = (unsigned int)((context->count[0] >> 3) & 0x3f); + padLen = (index < 56) ? (56 - index) : (120 - index); + MD4Update (context, PADDING, padLen); + + /* Append length (before padding) */ + MD4Update (context, bits, 8); + /* Store state in digest */ + Encode (digest, context->state, 16); + + /* Zeroize sensitive information. + */ + memset ((unsigned char *)context, 0, sizeof (*context)); + +} + +/* MD4 basic transformation. Transforms state based on block. + */ +static void MD4Transform (uint32_t state[4], unsigned char block[64]) +{ + uint32_t a = state[0], b = state[1], c = state[2], d = state[3], x[16]; + + Decode (x, block, 64); + + /* Round 1 */ + FF (a, b, c, d, x[ 0], S11); /* 1 */ + FF (d, a, b, c, x[ 1], S12); /* 2 */ + FF (c, d, a, b, x[ 2], S13); /* 3 */ + FF (b, c, d, a, x[ 3], S14); /* 4 */ + FF (a, b, c, d, x[ 4], S11); /* 5 */ + FF (d, a, b, c, x[ 5], S12); /* 6 */ + FF (c, d, a, b, x[ 6], S13); /* 7 */ + FF (b, c, d, a, x[ 7], S14); /* 8 */ + FF (a, b, c, d, x[ 8], S11); /* 9 */ + FF (d, a, b, c, x[ 9], S12); /* 10 */ + FF (c, d, a, b, x[10], S13); /* 11 */ + FF (b, c, d, a, x[11], S14); /* 12 */ + FF (a, b, c, d, x[12], S11); /* 13 */ + FF (d, a, b, c, x[13], S12); /* 14 */ + FF (c, d, a, b, x[14], S13); /* 15 */ + FF (b, c, d, a, x[15], S14); /* 16 */ + + /* Round 2 */ + GG (a, b, c, d, x[ 0], S21); /* 17 */ + GG (d, a, b, c, x[ 4], S22); /* 18 */ + GG (c, d, a, b, x[ 8], S23); /* 19 */ + GG (b, c, d, a, x[12], S24); /* 20 */ + GG (a, b, c, d, x[ 1], S21); /* 21 */ + GG (d, a, b, c, x[ 5], S22); /* 22 */ + GG (c, d, a, b, x[ 9], S23); /* 23 */ + GG (b, c, d, a, x[13], S24); /* 24 */ + GG (a, b, c, d, x[ 2], S21); /* 25 */ + GG (d, a, b, c, x[ 6], S22); /* 26 */ + GG (c, d, a, b, x[10], S23); /* 27 */ + GG (b, c, d, a, x[14], S24); /* 28 */ + GG (a, b, c, d, x[ 3], S21); /* 29 */ + GG (d, a, b, c, x[ 7], S22); /* 30 */ + GG (c, d, a, b, x[11], S23); /* 31 */ + GG (b, c, d, a, x[15], S24); /* 32 */ + + /* Round 3 */ + HH (a, b, c, d, x[ 0], S31); /* 33 */ + HH (d, a, b, c, x[ 8], S32); /* 34 */ + HH (c, d, a, b, x[ 4], S33); /* 35 */ + HH (b, c, d, a, x[12], S34); /* 36 */ + HH (a, b, c, d, x[ 2], S31); /* 37 */ + HH (d, a, b, c, x[10], S32); /* 38 */ + HH (c, d, a, b, x[ 6], S33); /* 39 */ + HH (b, c, d, a, x[14], S34); /* 40 */ + HH (a, b, c, d, x[ 1], S31); /* 41 */ + HH (d, a, b, c, x[ 9], S32); /* 42 */ + HH (c, d, a, b, x[ 5], S33); /* 43 */ + HH (b, c, d, a, x[13], S34); /* 44 */ + HH (a, b, c, d, x[ 3], S31); /* 45 */ + HH (d, a, b, c, x[11], S32); /* 46 */ + HH (c, d, a, b, x[ 7], S33); /* 47 */ + HH (b, c, d, a, x[15], S34); /* 48 */ + + state[0] += a; + state[1] += b; + state[2] += c; + state[3] += d; + + /* Zeroize sensitive information. + */ + memset ((unsigned char *)x, 0, sizeof (x)); +} + +/* Encodes input (uint32_t) into output (unsigned char). Assumes len is + a multiple of 4. + */ +static void Encode (unsigned char *output, uint32_t *input, unsigned int len) +{ + unsigned int i, j; + + for (i = 0, j = 0; j < len; i++, j += 4) { + output[j] = (unsigned char)(input[i] & 0xff); + output[j+1] = (unsigned char)((input[i] >> 8) & 0xff); + output[j+2] = (unsigned char)((input[i] >> 16) & 0xff); + output[j+3] = (unsigned char)((input[i] >> 24) & 0xff); + } +} + +/* Decodes input (unsigned char) into output (uint32_t). Assumes len is + a multiple of 4. + + */ +static void Decode (uint32_t *output, unsigned char *input, unsigned int len) +{ + unsigned int i, j; + + for (i = 0, j = 0; j < len; i++, j += 4) + output[i] = ((uint32_t)input[j]) | (((uint32_t)input[j+1]) << 8) | + (((uint32_t)input[j+2]) << 16) | (((uint32_t)input[j+3]) << 24); +} diff --git a/src/md4.h b/src/md4.h new file mode 100644 index 0000000..26f7bc5 --- /dev/null +++ b/src/md4.h @@ -0,0 +1,34 @@ +/* Copyright (C) 1991-2, RSA Data Security, Inc. Created 1991. All + rights reserved. + + License to copy and use this software is granted provided that it + is identified as the "RSA Data Security, Inc. MD4 Message-Digest + Algorithm" in all material mentioning or referencing this software + or this function. + + License is also granted to make and use derivative works provided + that such works are identified as "derived from the RSA Data + Security, Inc. MD4 Message-Digest Algorithm" in all material + mentioning or referencing the derived work. + + RSA Data Security, Inc. makes no representations concerning either + the merchantability of this software or the suitability of this + software for any particular purpose. It is provided "as is" + without express or implied warranty of any kind. + + These notices must be retained in any copies of any part of this + documentation and/or software. + */ + +#include + +/* MD4 context. */ +typedef struct { + uint32_t state[4]; /* state (ABCD) */ + uint32_t count[2]; /* number of bits, modulo 2^64 (lsb first) */ + unsigned char buffer[64]; /* input buffer */ +} MD4_CTX; + +void MD4Init(MD4_CTX *); +void MD4Update(MD4_CTX *, unsigned char *, unsigned int); +void MD4Final(unsigned char [16], MD4_CTX *); diff --git a/src/mod_url.c b/src/mod_url.c index c8c62cb..5c0f81f 100644 --- a/src/mod_url.c +++ b/src/mod_url.c @@ -1,71 +1,329 @@ #include #include #include +#include +#include "assman_impl.h" +#include "tpool.h" +#include "md4.h" #ifdef BUILD_MOD_URL +#include #include +enum { + DL_UNKNOWN, + DL_STARTED, + DL_ERROR, + DL_DONE +}; + +struct file_info { + char *url; + char *cache_fname; + + FILE *cache_file; + + /* fopen-thread waits until the state becomes known (request starts transmitting or fails) */ + int state; + pthread_cond_t state_cond; + pthread_mutex_t state_mutex; +}; + +static void *fop_open(const char *fname, void *udata); +static void fop_close(void *fp, void *udata); +static long fop_seek(void *fp, long offs, int whence, void *udata); +static long fop_read(void *fp, void *buf, long size, void *udata); + +static void exit_cleanup(void); +static void download(void *data); +static size_t recv_callback(char *ptr, size_t size, size_t nmemb, void *udata); static const char *get_temp_dir(void); static char *tmpdir, *cachedir; +static struct thread_pool *tpool; +static CURL **curl; struct ass_fileops *ass_alloc_url(const char *url) { static int done_init; struct ass_fileops *fop; + int i; if(!done_init) { curl_global_init(CURL_GLOBAL_ALL); - atexit(curl_global_cleanup); - done_init = 1; + atexit(exit_cleanup); if(!*ass_mod_url_cachedir) { strcpy(ass_mod_url_cachedir, "assman_cache"); } - tmpdir = get_temp_dir(); + tmpdir = (char*)get_temp_dir(); if(!(cachedir = malloc(strlen(ass_mod_url_cachedir) + strlen(tmpdir) + 2))) { fprintf(stderr, "assman: failed to allocate cachedir path buffer\n"); - return 0; + goto init_failed; } sprintf(cachedir, "%s/%s", tmpdir, ass_mod_url_cachedir); + + if(ass_mod_url_max_threads <= 0) { + ass_mod_url_max_threads = 8; + } + + if(!(curl = calloc(ass_mod_url_max_threads, sizeof *curl))) { + perror("assman: failed to allocate curl context table"); + goto init_failed; + } + for(i=0; iudata = malloc(strlen(url) + 1))) { + free(fop); + return 0; + } + strcpy(fop->udata, url); - fop->udata = 0; fop->open = fop_open; fop->close = fop_close; fop->seek = fop_seek; fop->read = fop_read; return fop; + +init_failed: + free(cachedir); + if(curl) { + for(i=0; icache_fname = cache_filename(fname, udata))) { + free(file); + ass_errno = ENOMEM; + return 0; + } + if(!(file->cache_file = fopen(file->cache_fname, "wb"))) { + fprintf(stderr, "assman: mod_url: failed to open cache file (%s) for writing: %s\n", + file->cache_fname, strerror(errno)); + ass_errno = errno; + free(file->cache_fname); + free(file); + return 0; + } + + file->state = DL_UNKNOWN; + pthread_mutex_init(&file->state_mutex, 0); + pthread_cond_init(&file->state_cond, 0); + + ass_tpool_enqueue(tpool, file, download, 0); + + /* wait until the file changes state */ + pthread_mutex_lock(&file->state_mutex); + while(file->state == DL_UNKNOWN) { + pthread_cond_wait(&file->state_cond, &file->state_mutex); + } + state = file->state; + pthread_mutex_unlock(&file->state_mutex); + + if(state == DL_ERROR) { + /* the worker stopped, so we can safely cleanup and return error */ + fclose(file->cache_file); + remove(file->cache_fname); + free(file->cache_fname); + pthread_cond_destroy(&file->state_cond); + pthread_mutex_destroy(&file->state_mutex); + free(file); + ass_errno = ENOENT; /* TODO: differentiate between 403 and 404 */ + return 0; + } + return file; +} + +static void wait_done(struct file_info *file) +{ + pthread_mutex_lock(&file->state_mutex); + while(file->state != DL_DONE && file->state != DL_ERROR) { + pthread_cond_wait(&file->state_cond, &file->state_mutex); + } + pthread_mutex_unlock(&file->state_mutex); } static void fop_close(void *fp, void *udata) { + struct file_info *file = fp; + + wait_done(file); /* TODO: stop download instead of waiting to finish */ + + fclose(file->cache_file); + if(file->state == DL_ERROR) remove(file->cache_fname); + free(file->cache_fname); + pthread_cond_destroy(&file->state_cond); + pthread_mutex_destroy(&file->state_mutex); + free(file); } static long fop_seek(void *fp, long offs, int whence, void *udata) { - return 0; + struct file_info *file = fp; + wait_done(file); + + fseek(file->cache_file, offs, whence); + return ftell(file->cache_file); } static long fop_read(void *fp, void *buf, long size, void *udata) { - return 0; + struct file_info *file = fp; + wait_done(file); + + return fread(buf, 1, size, file->cache_file); } -#else +/* this is the function called by the worker threads to perform the download + * signal state changes, and prepare the cache file for reading + */ +static void download(void *data) +{ + int tid, res; + struct file_info *file = data; + + tid = ass_tpool_thread_id(tpool); + + curl_easy_setopt(curl[tid], CURLOPT_URL, file->url); + curl_easy_setopt(curl[tid], CURLOPT_WRITEDATA, file); + res = curl_easy_perform(curl[tid]); + + pthread_mutex_lock(&file->state_mutex); + if(res == CURLE_OK) { + file->state = DL_DONE; + fclose(file->cache_file); + if(!(file->cache_file = fopen(file->cache_fname, "rb"))) { + fprintf(stderr, "assman: failed to reopen cache file (%s) for reading: %s\n", + file->cache_fname, strerror(errno)); + file->state = DL_ERROR; + } + } else { + file->state = DL_ERROR; + } + pthread_cond_broadcast(&file->state_cond); + pthread_mutex_unlock(&file->state_mutex); +} + +/* this function is called by curl to pass along downloaded data chunks */ +static size_t recv_callback(char *ptr, size_t size, size_t count, void *udata) +{ + struct file_info *file = udata; + + pthread_mutex_lock(&file->state_mutex); + if(file->state == DL_UNKNOWN) { + file->state = DL_STARTED; + pthread_cond_broadcast(&file->state_cond); + } + pthread_mutex_unlock(&file->state_mutex); + + return fwrite(ptr, size, count, file->cache_file); +} + +#ifdef WIN32 +#include + +static const char *get_temp_dir(void) +{ + static char buf[MAX_PATH + 1]; + GetTempPathA(MAX_PATH + 1, buf); + return buf; +} +#else /* UNIX */ +static const char *get_temp_dir(void) +{ + char *env = getenv("TMPDIR"); + return env ? env : "/tmp"; +} +#endif + + + +#else /* don't build mod_url */ struct ass_fileops *ass_alloc_url(const char *url) { fprintf(stderr, "assman: compiled without URL asset source support\n"); diff --git a/src/tpool.c b/src/tpool.c new file mode 100644 index 0000000..500ef6c --- /dev/null +++ b/src/tpool.c @@ -0,0 +1,493 @@ +/* worker thread pool based on POSIX threads + * author: John Tsiombikas + * This code is public domain. + */ +#include +#include +#include +#include +#include "tpool.h" + +#if defined(__APPLE__) && defined(__MACH__) +# ifndef __unix__ +# define __unix__ 1 +# endif /* unix */ +# ifndef __bsd__ +# define __bsd__ 1 +# endif /* bsd */ +#endif /* apple */ + +#if defined(unix) || defined(__unix__) +#include +#include + +# ifdef __bsd__ +# include +# endif +#endif + +#if defined(WIN32) || defined(__WIN32__) +#include +#endif + + +struct work_item { + void *data; + tpool_callback work, done; + struct work_item *next; +}; + +struct thread_data { + int id; + struct thread_pool *pool; +}; + +struct thread_pool { + pthread_t *threads; + struct thread_data *tdata; + int num_threads; + pthread_key_t idkey; + + int qsize; + struct work_item *workq, *workq_tail; + pthread_mutex_t workq_mutex; + pthread_cond_t workq_condvar; + + int nactive; /* number of active workers (not sleeping) */ + + pthread_cond_t done_condvar; + + int should_quit; + int in_batch; + + int nref; /* reference count */ + +#if defined(WIN32) || defined(__WIN32__) + HANDLE wait_event; +#else + int wait_pipe[2]; +#endif +}; + +static void *thread_func(void *args); +static void send_done_event(struct thread_pool *tpool); + +static struct work_item *alloc_work_item(void); +static void free_work_item(struct work_item *w); + + +struct thread_pool *ass_tpool_create(int num_threads) +{ + int i; + struct thread_pool *tpool; + + if(!(tpool = calloc(1, sizeof *tpool))) { + return 0; + } + pthread_mutex_init(&tpool->workq_mutex, 0); + pthread_cond_init(&tpool->workq_condvar, 0); + pthread_cond_init(&tpool->done_condvar, 0); + pthread_key_create(&tpool->idkey, 0); + + pthread_setspecific(tpool->idkey, (void*)0xffffffff); + +#if !defined(WIN32) && !defined(__WIN32__) + tpool->wait_pipe[0] = tpool->wait_pipe[1] = -1; +#endif + + if(num_threads <= 0) { + num_threads = ass_tpool_num_processors(); + } + tpool->num_threads = num_threads; + + if(!(tpool->threads = calloc(num_threads, sizeof *tpool->threads))) { + free(tpool); + return 0; + } + if(!(tpool->tdata = malloc(num_threads * sizeof *tpool->tdata))) { + free(tpool->threads); + free(tpool); + return 0; + } + + for(i=0; itdata[i].id = i; + tpool->tdata[i].pool = tpool; + + if(pthread_create(tpool->threads + i, 0, thread_func, tpool->tdata + i) == -1) { + /*tpool->threads[i] = 0;*/ + ass_tpool_destroy(tpool); + return 0; + } + } + return tpool; +} + +void ass_tpool_destroy(struct thread_pool *tpool) +{ + int i; + if(!tpool) return; + + ass_tpool_clear(tpool); + tpool->should_quit = 1; + + pthread_cond_broadcast(&tpool->workq_condvar); + + if(tpool->threads) { + printf("thread_pool: waiting for %d worker threads to stop ", tpool->num_threads); + fflush(stdout); + + for(i=0; inum_threads; i++) { + pthread_join(tpool->threads[i], 0); + putchar('.'); + fflush(stdout); + } + putchar('\n'); + free(tpool->threads); + } + free(tpool->tdata); + + /* also wake up anyone waiting on the wait* calls */ + tpool->nactive = 0; + pthread_cond_broadcast(&tpool->done_condvar); + send_done_event(tpool); + + pthread_mutex_destroy(&tpool->workq_mutex); + pthread_cond_destroy(&tpool->workq_condvar); + pthread_cond_destroy(&tpool->done_condvar); + pthread_key_delete(tpool->idkey); + +#if defined(WIN32) || defined(__WIN32__) + if(tpool->wait_event) { + CloseHandle(tpool->wait_event); + } +#else + if(tpool->wait_pipe[0] >= 0) { + close(tpool->wait_pipe[0]); + close(tpool->wait_pipe[1]); + } +#endif +} + +int ass_tpool_addref(struct thread_pool *tpool) +{ + return ++tpool->nref; +} + +int ass_tpool_release(struct thread_pool *tpool) +{ + if(--tpool->nref <= 0) { + ass_tpool_destroy(tpool); + return 0; + } + return tpool->nref; +} + +void ass_tpool_begin_batch(struct thread_pool *tpool) +{ + tpool->in_batch = 1; +} + +void ass_tpool_end_batch(struct thread_pool *tpool) +{ + tpool->in_batch = 0; + pthread_cond_broadcast(&tpool->workq_condvar); +} + +int ass_tpool_enqueue(struct thread_pool *tpool, void *data, + tpool_callback work_func, tpool_callback done_func) +{ + struct work_item *job; + + if(!(job = alloc_work_item())) { + return -1; + } + job->work = work_func; + job->done = done_func; + job->data = data; + job->next = 0; + + pthread_mutex_lock(&tpool->workq_mutex); + if(tpool->workq) { + tpool->workq_tail->next = job; + tpool->workq_tail = job; + } else { + tpool->workq = tpool->workq_tail = job; + } + ++tpool->qsize; + pthread_mutex_unlock(&tpool->workq_mutex); + + if(!tpool->in_batch) { + pthread_cond_broadcast(&tpool->workq_condvar); + } + return 0; +} + +void ass_tpool_clear(struct thread_pool *tpool) +{ + pthread_mutex_lock(&tpool->workq_mutex); + while(tpool->workq) { + void *tmp = tpool->workq; + tpool->workq = tpool->workq->next; + free(tmp); + } + tpool->workq = tpool->workq_tail = 0; + tpool->qsize = 0; + pthread_mutex_unlock(&tpool->workq_mutex); +} + +int ass_tpool_queued_jobs(struct thread_pool *tpool) +{ + int res; + pthread_mutex_lock(&tpool->workq_mutex); + res = tpool->qsize; + pthread_mutex_unlock(&tpool->workq_mutex); + return res; +} + +int ass_tpool_active_jobs(struct thread_pool *tpool) +{ + int res; + pthread_mutex_lock(&tpool->workq_mutex); + res = tpool->nactive; + pthread_mutex_unlock(&tpool->workq_mutex); + return res; +} + +int ass_tpool_pending_jobs(struct thread_pool *tpool) +{ + int res; + pthread_mutex_lock(&tpool->workq_mutex); + res = tpool->qsize + tpool->nactive; + pthread_mutex_unlock(&tpool->workq_mutex); + return res; +} + +void ass_tpool_wait(struct thread_pool *tpool) +{ + pthread_mutex_lock(&tpool->workq_mutex); + while(tpool->nactive || tpool->qsize) { + pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex); + } + pthread_mutex_unlock(&tpool->workq_mutex); +} + +void ass_tpool_wait_pending(struct thread_pool *tpool, int pending_target) +{ + pthread_mutex_lock(&tpool->workq_mutex); + while(tpool->qsize + tpool->nactive > pending_target) { + pthread_cond_wait(&tpool->done_condvar, &tpool->workq_mutex); + } + pthread_mutex_unlock(&tpool->workq_mutex); +} + +#if defined(WIN32) || defined(__WIN32__) +long ass_tpool_timedwait(struct thread_pool *tpool, long timeout) +{ + fprintf(stderr, "tpool_timedwait currently unimplemented on windows\n"); + abort(); + return 0; +} + +/* TODO: actually does this work with MinGW ? */ +int ass_tpool_get_wait_fd(struct thread_pool *tpool) +{ + static int once; + if(!once) { + once = 1; + fprintf(stderr, "warning: ass_tpool_get_wait_fd call on Windows does nothing\n"); + } + return 0; +} + +void *ass_tpool_get_wait_handle(struct thread_pool *tpool) +{ + if(!tpool->wait_event) { + if(!(tpool->wait_event = CreateEvent(0, 0, 0, 0))) { + return 0; + } + } + return tpool->wait_event; +} + +static void send_done_event(struct thread_pool *tpool) +{ + if(tpool->wait_event) { + SetEvent(tpool->wait_event); + } +} + +#else /* UNIX */ + +long ass_tpool_timedwait(struct thread_pool *tpool, long timeout) +{ + struct timespec tout_ts; + struct timeval tv0, tv; + gettimeofday(&tv0, 0); + + long sec = timeout / 1000; + tout_ts.tv_nsec = tv0.tv_usec * 1000 + (timeout % 1000) * 1000000; + tout_ts.tv_sec = tv0.tv_sec + sec; + + pthread_mutex_lock(&tpool->workq_mutex); + while(tpool->nactive || tpool->qsize) { + if(pthread_cond_timedwait(&tpool->done_condvar, + &tpool->workq_mutex, &tout_ts) == ETIMEDOUT) { + break; + } + } + pthread_mutex_unlock(&tpool->workq_mutex); + + gettimeofday(&tv, 0); + return (tv.tv_sec - tv0.tv_sec) * 1000 + (tv.tv_usec - tv0.tv_usec) / 1000; +} + +int ass_tpool_get_wait_fd(struct thread_pool *tpool) +{ + if(tpool->wait_pipe[0] < 0) { + if(pipe(tpool->wait_pipe) == -1) { + return -1; + } + } + return tpool->wait_pipe[0]; +} + +void *ass_tpool_get_wait_handle(struct thread_pool *tpool) +{ + static int once; + if(!once) { + once = 1; + fprintf(stderr, "warning: ass_tpool_get_wait_handle call on UNIX does nothing\n"); + } + return 0; +} + +static void send_done_event(struct thread_pool *tpool) +{ + if(tpool->wait_pipe[1] >= 0) { + write(tpool->wait_pipe[1], tpool, 1); + } +} +#endif /* WIN32/UNIX */ + +static void *thread_func(void *args) +{ + struct thread_data *tdata = args; + struct thread_pool *tpool = tdata->pool; + + pthread_setspecific(tpool->idkey, (void*)(intptr_t)tdata->id); + + pthread_mutex_lock(&tpool->workq_mutex); + while(!tpool->should_quit) { + if(!tpool->workq) { + pthread_cond_wait(&tpool->workq_condvar, &tpool->workq_mutex); + if(tpool->should_quit) break; + } + + while(!tpool->should_quit && tpool->workq) { + /* grab the first job */ + struct work_item *job = tpool->workq; + tpool->workq = tpool->workq->next; + if(!tpool->workq) + tpool->workq_tail = 0; + ++tpool->nactive; + --tpool->qsize; + pthread_mutex_unlock(&tpool->workq_mutex); + + /* do the job */ + job->work(job->data); + if(job->done) { + job->done(job->data); + } + free_work_item(job); + + pthread_mutex_lock(&tpool->workq_mutex); + /* notify everyone interested that we're done with this job */ + pthread_cond_broadcast(&tpool->done_condvar); + send_done_event(tpool); + --tpool->nactive; + } + } + pthread_mutex_unlock(&tpool->workq_mutex); + + return 0; +} + + +int ass_tpool_thread_id(struct thread_pool *tpool) +{ + int id = (intptr_t)pthread_getspecific(tpool->idkey); + if(id >= tpool->num_threads) { + return -1; + } + return id; +} + + +/* The following highly platform-specific code detects the number + * of processors available in the system. It's used by the thread pool + * to autodetect how many threads to spawn. + * Currently works on: Linux, BSD, Darwin, and Windows. + */ +int ass_tpool_num_processors(void) +{ +#if defined(unix) || defined(__unix__) +# if defined(__bsd__) + /* BSD systems provide the num.processors through sysctl */ + int num, mib[] = {CTL_HW, HW_NCPU}; + size_t len = sizeof num; + + sysctl(mib, 2, &num, &len, 0, 0); + return num; + +# elif defined(__sgi) + /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */ + return sysconf(_SC_NPROC_ONLN); +# else + /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */ + return sysconf(_SC_NPROCESSORS_ONLN); +# endif /* bsd/sgi/other */ + +#elif defined(WIN32) || defined(__WIN32__) + /* under windows we need to call GetSystemInfo */ + SYSTEM_INFO info; + GetSystemInfo(&info); + return info.dwNumberOfProcessors; +#endif +} + +#define MAX_WPOOL_SIZE 64 +static pthread_mutex_t wpool_lock = PTHREAD_MUTEX_INITIALIZER; +static struct work_item *wpool; +static int wpool_size; + +/* work item allocator */ +static struct work_item *alloc_work_item(void) +{ + struct work_item *w; + + pthread_mutex_lock(&wpool_lock); + if(!wpool) { + pthread_mutex_unlock(&wpool_lock); + return malloc(sizeof(struct work_item)); + } + + w = wpool; + wpool = wpool->next; + --wpool_size; + pthread_mutex_unlock(&wpool_lock); + return w; +} + +static void free_work_item(struct work_item *w) +{ + pthread_mutex_lock(&wpool_lock); + if(wpool_size >= MAX_WPOOL_SIZE) { + free(w); + } else { + w->next = wpool; + wpool = w; + ++wpool_size; + } + pthread_mutex_unlock(&wpool_lock); +} diff --git a/src/tpool.h b/src/tpool.h new file mode 100644 index 0000000..0bb4747 --- /dev/null +++ b/src/tpool.h @@ -0,0 +1,85 @@ +/* worker thread pool based on POSIX threads + * author: John Tsiombikas + * This code is public domain. + */ +#ifndef ASSMAN_THREADPOOL_H_ +#define ASSMAN_THREADPOOL_H_ + +struct thread_pool; + +/* type of the function accepted as work or completion callback */ +typedef void (*tpool_callback)(void*); + +#ifdef __cplusplus +extern "C" { +#endif + +/* if num_threads == 0, auto-detect how many threads to spawn */ +struct thread_pool *ass_tpool_create(int num_threads); +void ass_tpool_destroy(struct thread_pool *tpool); + +/* optional reference counting interface for thread pool sharing */ +int ass_tpool_addref(struct thread_pool *tpool); +int ass_tpool_release(struct thread_pool *tpool); /* will ass_tpool_destroy on nref 0 */ + +/* if begin_batch is called before an enqueue, the worker threads will not be + * signalled to start working until end_batch is called. + */ +void ass_tpool_begin_batch(struct thread_pool *tpool); +void ass_tpool_end_batch(struct thread_pool *tpool); + +/* if enqueue is called without calling begin_batch first, it will immediately + * wake up the worker threads to start working on the enqueued item + */ +int ass_tpool_enqueue(struct thread_pool *tpool, void *data, + tpool_callback work_func, tpool_callback done_func); +/* clear the work queue. does not cancel any currently running jobs */ +void ass_tpool_clear(struct thread_pool *tpool); + +/* returns the number of queued work items */ +int ass_tpool_queued_jobs(struct thread_pool *tpool); +/* returns the number of active (working) threads */ +int ass_tpool_active_jobs(struct thread_pool *tpool); +/* returns the number of pending jobs, both in queue and active */ +int ass_tpool_pending_jobs(struct thread_pool *tpool); + +/* wait for all pending jobs to be completed */ +void ass_tpool_wait(struct thread_pool *tpool); +/* wait until the pending jobs are down to the target specified + * for example, to wait until a single job has been completed: + * ass_tpool_wait_pending(tpool, ass_tpool_pending_jobs(tpool) - 1); + * this interface is slightly awkward to avoid race conditions. */ +void ass_tpool_wait_pending(struct thread_pool *tpool, int pending_target); +/* wait for all pending jobs to be completed for up to "timeout" milliseconds */ +long ass_tpool_timedwait(struct thread_pool *tpool, long timeout); + +/* return a file descriptor which can be used to wait for pending job + * completion events. A single char is written every time a job completes. + * You should empty the pipe every time you receive such an event. + * + * This is a UNIX-specific call. On windows it does nothing. + */ +int ass_tpool_get_wait_fd(struct thread_pool *tpool); + +/* return an auto-resetting Event HANDLE which can be used to wait for + * pending job completion events. + * + * This is a Win32-specific call. On UNIX it does nothing. + */ +void *ass_tpool_get_wait_handle(struct thread_pool *tpool); + +/* When called by a work/done callback, it returns the thread number executing + * it. From the main thread it returns -1. + */ +int ass_tpool_thread_id(struct thread_pool *tpool); + +/* returns the number of processors on the system. + * individual cores in multi-core processors are counted as processors. + */ +int ass_tpool_num_processors(void); + +#ifdef __cplusplus +} +#endif + +#endif /* ASSMAN_THREADPOOL_H_ */ -- 1.7.10.4