25#include <apr_thread_proc.h>
26#include <apr_thread_mutex.h>
27#include <apr_thread_cond.h>
33typedef struct napr_cell_t napr_cell_t;
41typedef struct napr_list_t napr_list_t;
48 unsigned long nb_cells;
51static inline napr_list_t *napr_list_make(apr_pool_t *p)
53 apr_pool_t *local_pool;
54 napr_list_t *napr_list;
56 apr_pool_create(&local_pool, p);
57 napr_list = apr_palloc(local_pool,
sizeof(
struct napr_list_t));
59 if (napr_list != NULL) {
60 napr_list->p = local_pool;
61 napr_list->head = NULL;
62 napr_list->tail = NULL;
63 napr_list->nb_cells = 0UL;
74 if (NULL != (cell = (napr_cell_t *) apr_palloc(napr_list->p,
sizeof(
struct napr_cell_t)))) {
76 cell->next = napr_list->head;
77 napr_list->head = cell;
78 napr_list->nb_cells += 1;
80 if (napr_list->nb_cells == 1)
81 napr_list->tail = napr_list->head;
92static inline void napr_list_cdr(napr_list_t *napr_list)
96 cell = napr_list->head->next;
97 napr_list->nb_cells -= 1UL;
98 napr_list->head = cell;
101static inline void napr_list_delete(napr_list_t *napr_list)
103 while (napr_list->head != NULL)
104 napr_list_cdr(napr_list);
106 apr_pool_destroy(napr_list->p);
109static inline int napr_list_enqueue(napr_list_t *napr_list,
void *element)
114 if (0 != napr_list->nb_cells) {
115 if (NULL != (cell = (napr_cell_t *) apr_palloc(napr_list->p,
sizeof(
struct napr_cell_t)))) {
116 napr_list->nb_cells += 1UL;
117 cell->data = element;
119 napr_list->tail->next = cell;
120 napr_list->tail = cell;
132static inline napr_cell_t *napr_list_first(napr_list_t *napr_list)
134 return napr_list->head;
137static inline unsigned long napr_list_size(napr_list_t *napr_list)
139 return napr_list->nb_cells;
143static inline napr_cell_t *napr_list_next(napr_cell_t *cell)
149static inline void *napr_list_get(napr_cell_t *cell)
157 apr_thread_t **thread;
161 apr_thread_mutex_t *threadpool_mutex;
162 apr_thread_cond_t *threadpool_update;
163 unsigned long nb_thread;
164 unsigned long nb_waiting;
194 unsigned int ended:1;
195 unsigned int shutdown:1;
198static void *APR_THREAD_FUNC napr_threadpool_loop(apr_thread_t *thd,
void *rec);
204 apr_pool_t *local_pool;
208 apr_pool_create(&local_pool, pool);
210 (*threadpool)->pool = local_pool;
214 apr_thread_mutex_create(&((*threadpool)->threadpool_mutex), APR_THREAD_MUTEX_DEFAULT, (*threadpool)->pool))) {
215 DEBUG_ERR(
"error calling apr_thread_mutex_create: %s", apr_strerror(status, errbuf, 128));
218 if (APR_SUCCESS != (status = apr_thread_cond_create(&((*threadpool)->threadpool_update), (*threadpool)->pool))) {
219 DEBUG_ERR(
"error calling apr_thread_cond_create: %s", apr_strerror(status, errbuf, 128));
222 (*threadpool)->thread = apr_palloc((*threadpool)->pool, nb_thread *
sizeof(apr_thread_mutex_t *));
223 (*threadpool)->ctx = ctx;
224 (*threadpool)->nb_thread = nb_thread;
225 (*threadpool)->nb_waiting = 0UL;
226 (*threadpool)->list = napr_list_make((*threadpool)->pool);
227 (*threadpool)->process_data = process_data;
228 (*threadpool)->run &= 0x0;
229 (*threadpool)->ended &= 0x0;
230 (*threadpool)->shutdown &= 0x0;
232 for (l = 0; l < nb_thread; l++) {
235 apr_thread_create(&((*threadpool)->thread[l]), NULL, napr_threadpool_loop, (*threadpool),
236 (*threadpool)->pool))) {
237 DEBUG_ERR(
"error calling apr_thread_create: %s", apr_strerror(status, errbuf, 128));
250 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
251 DEBUG_ERR(
"error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
254 threadpool->ended &= 0x0;
255 napr_list_enqueue(threadpool->list, data);
256 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
257 DEBUG_ERR(
"error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
260 if (APR_SUCCESS != (status = apr_thread_cond_signal(threadpool->threadpool_update))) {
261 DEBUG_ERR(
"error calling apr_thread_cond_signal: %s", apr_strerror(status, errbuf, 128));
275 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
276 DEBUG_ERR(
"error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
279 list_size = napr_list_size(threadpool->list);
280 if ((0 != list_size) || (threadpool->nb_waiting != threadpool->nb_thread)) {
287 threadpool->run |= 0x1;
289 if (APR_SUCCESS != (status = apr_thread_cond_wait(threadpool->threadpool_update, threadpool->threadpool_mutex))) {
290 DEBUG_ERR(
"error calling apr_thread_cond_wait: %s", apr_strerror(status, errbuf, 128));
295 threadpool->run &= 0x0;
304 napr_list_delete(threadpool->list);
305 threadpool->list = napr_list_make(threadpool->pool);
306 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
307 DEBUG_ERR(
"error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
314static void *APR_THREAD_FUNC napr_threadpool_loop(apr_thread_t *thd,
void *rec)
321 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
322 DEBUG_ERR(
"error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
327 while (!(threadpool->shutdown & 0x1)) {
329 if ((0 < napr_list_size(threadpool->list)) && !(threadpool->ended & 0x1)) {
333 cell = napr_list_first(threadpool->list);
334 data = napr_list_get(cell);
335 napr_list_cdr(threadpool->list);
341 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
342 DEBUG_ERR(
"error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
345 threadpool->process_data(threadpool->ctx, data);
346 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
347 DEBUG_ERR(
"error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
353 threadpool->nb_waiting += 1UL;
357 if (!(threadpool->ended & 0x1) && (threadpool->run & 0x1) && (threadpool->nb_waiting == threadpool->nb_thread)) {
358 threadpool->ended |= 0x1;
359 if (APR_SUCCESS != (status = apr_thread_cond_broadcast(threadpool->threadpool_update))) {
360 DEBUG_ERR(
"error calling apr_thread_cond_signal: %s", apr_strerror(status, errbuf, 128));
370 if (APR_SUCCESS != (status = apr_thread_cond_wait(threadpool->threadpool_update, threadpool->threadpool_mutex))) {
371 DEBUG_ERR(
"error calling apr_thread_cond_wait: %s", apr_strerror(status, errbuf, 128));
378 threadpool->nb_waiting -= 1UL;
383 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
384 DEBUG_ERR(
"error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
393 apr_status_t status, rv;
397 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
398 DEBUG_ERR(
"error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
403 threadpool->shutdown |= 0x1;
406 if (APR_SUCCESS != (status = apr_thread_cond_broadcast(threadpool->threadpool_update))) {
407 DEBUG_ERR(
"error calling apr_thread_cond_broadcast: %s", apr_strerror(status, errbuf, 128));
408 apr_thread_mutex_unlock(threadpool->threadpool_mutex);
412 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
413 DEBUG_ERR(
"error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
418 for (l = 0; l < threadpool->nb_thread; l++) {
419 if (APR_SUCCESS != (status = apr_thread_join(&rv, threadpool->thread[l]))) {
420 DEBUG_ERR(
"error calling apr_thread_join: %s", apr_strerror(status, errbuf, 128));
UTIL debug output macros.
#define DEBUG_ERR(str, arg...)
Display error message at the level error.
apr_status_t napr_threadpool_shutdown(napr_threadpool_t *threadpool)
Shuts down the thread pool.
apr_status_t napr_threadpool_wait(napr_threadpool_t *threadpool)
Waits until all tasks currently in the queue have been processed.
static int napr_list_cons(napr_list_t *napr_list, void *element)
apr_status_t napr_threadpool_init(napr_threadpool_t **threadpool, void *ctx, unsigned long nb_thread, threadpool_process_data_callback_fn_t *process_data, apr_pool_t *pool)
Initializes a thread pool.
apr_status_t napr_threadpool_add(napr_threadpool_t *threadpool, void *data)
Adds a task (a data item) to the thread pool's processing queue.
A simple fixed-size thread pool for concurrent task processing.
struct napr_threadpool_t napr_threadpool_t
Opaque thread pool structure.
apr_status_t() threadpool_process_data_callback_fn_t(void *ctx, void *data)
The worker function executed by threads in the pool.