ftwin 0.8.10
napr_threadpool.c
Go to the documentation of this file.
1
6/*
7 * Copyright (C) 2007 François Pesce : francois.pesce (at) gmail (dot) com
8 *
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 *
13 * http://www.apache.org/licenses/LICENSE-2.0
14 *
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 */
21
22#include <stdlib.h>
23#include <stdio.h>
24
25#include <apr_thread_proc.h>
26#include <apr_thread_mutex.h>
27#include <apr_thread_cond.h>
28
29#include "napr_threadpool.h"
30#include "debug.h"
31
32/* Definitions of some basic list methods, see further to find pure threadpool stuff */
33typedef struct napr_cell_t napr_cell_t;
34
35struct napr_cell_t
36{
37 napr_cell_t *next;
38 void *data;
39};
40
41typedef struct napr_list_t napr_list_t;
42
43struct napr_list_t
44{
45 apr_pool_t *p;
46 napr_cell_t *head;
47 napr_cell_t *tail;
48 unsigned long nb_cells;
49};
50
51static inline napr_list_t *napr_list_make(apr_pool_t *p)
52{
53 apr_pool_t *local_pool;
54 napr_list_t *napr_list;
55
56 apr_pool_create(&local_pool, p);
57 napr_list = apr_palloc(local_pool, sizeof(struct napr_list_t));
58
59 if (napr_list != NULL) {
60 napr_list->p = local_pool;
61 napr_list->head = NULL;
62 napr_list->tail = NULL;
63 napr_list->nb_cells = 0UL;
64 }
65
66 return napr_list;
67}
68
69static int napr_list_cons(napr_list_t *napr_list, void *element)
70{
71 napr_cell_t *cell;
72 int rc;
74 if (NULL != (cell = (napr_cell_t *) apr_palloc(napr_list->p, sizeof(struct napr_cell_t)))) {
75 cell->data = element;
76 cell->next = napr_list->head;
77 napr_list->head = cell;
78 napr_list->nb_cells += 1;
79
80 if (napr_list->nb_cells == 1)
81 napr_list->tail = napr_list->head;
82
83 rc = 0;
84 }
85 else {
86 rc = -1;
87 }
88
89 return rc;
90}
91
92static inline void napr_list_cdr(napr_list_t *napr_list)
93{
94 void *cell;
95
96 cell = napr_list->head->next;
97 napr_list->nb_cells -= 1UL;
98 napr_list->head = cell;
99}
100
101static inline void napr_list_delete(napr_list_t *napr_list)
102{
103 while (napr_list->head != NULL)
104 napr_list_cdr(napr_list);
105
106 apr_pool_destroy(napr_list->p);
107}
108
109static inline int napr_list_enqueue(napr_list_t *napr_list, void *element)
110{
111 napr_cell_t *cell;
112 int rc = 0;
113
114 if (0 != napr_list->nb_cells) {
115 if (NULL != (cell = (napr_cell_t *) apr_palloc(napr_list->p, sizeof(struct napr_cell_t)))) {
116 napr_list->nb_cells += 1UL;
117 cell->data = element;
118 cell->next = NULL;
119 napr_list->tail->next = cell;
120 napr_list->tail = cell;
121 }
122 else
123 rc = -1;
124 }
125 else {
126 rc = napr_list_cons(napr_list, element);
127 }
128
129 return rc;
130}
131
132static inline napr_cell_t *napr_list_first(napr_list_t *napr_list)
133{
134 return napr_list->head;
135}
136
137static inline unsigned long napr_list_size(napr_list_t *napr_list)
138{
139 return napr_list->nb_cells;
140}
141
142#if 0
143static inline napr_cell_t *napr_list_next(napr_cell_t *cell)
144{
145 return cell->next;
146}
147#endif
148
149static inline void *napr_list_get(napr_cell_t *cell)
150{
151 return cell->data;
152}
153
154/* The threadpool structures and engine */
156{
157 apr_thread_t **thread;
158
159 void *ctx;
160 /* This mutex protects everything below in writing and reading */
161 apr_thread_mutex_t *threadpool_mutex;
162 apr_thread_cond_t *threadpool_update;
163 unsigned long nb_thread;
164 unsigned long nb_waiting;
165 napr_list_t *list;
167 apr_pool_t *pool;
168
169 /*
170 * run == 0x1 means we are currently processing data
171 * run == 0x0 means we are creating the pool, OR (exclusive) we were
172 * processing data and encountered a end of list and all thread are
173 * waiting (no more data).
174 * Algorithm :
175 * run &= 0x0;
176 * create the thread pool loops :
177 * code in the loop say :
178 * run at 0x0, we can be nb_thread == nb_waiting in a "wait" mode.
179 * This doesn't matter.
180 *
181 * external caller function add all data to process, then call the napr_threadpool_wait:
182 * napr_threadpool_add_data:
183 * fill list.
184 * napr_threadpool_wait:
185 * run |= 0x1;
186 *
187 * code in the loop say :
188 * run at 0x1, if we are at nb_thread == nb_waiting I'd better warn
189 * the caller of the napr_threadpool_wait function that the processing
190 * is ended.
191 */
192
193 unsigned int run:1;
194 unsigned int ended:1;
195 unsigned int shutdown:1;
196};
197
198static void *APR_THREAD_FUNC napr_threadpool_loop(apr_thread_t *thd, void *rec);
199
200extern apr_status_t napr_threadpool_init(napr_threadpool_t **threadpool, void *ctx, unsigned long nb_thread,
201 threadpool_process_data_callback_fn_t *process_data, apr_pool_t *pool)
202{
203 char errbuf[128];
204 apr_pool_t *local_pool;
205 unsigned long l;
206 apr_status_t status;
207
208 apr_pool_create(&local_pool, pool);
209 (*threadpool) = apr_palloc(local_pool, sizeof(struct napr_threadpool_t));
210 (*threadpool)->pool = local_pool;
211
212 if (APR_SUCCESS !=
213 (status =
214 apr_thread_mutex_create(&((*threadpool)->threadpool_mutex), APR_THREAD_MUTEX_DEFAULT, (*threadpool)->pool))) {
215 DEBUG_ERR("error calling apr_thread_mutex_create: %s", apr_strerror(status, errbuf, 128));
216 return status;
217 }
218 if (APR_SUCCESS != (status = apr_thread_cond_create(&((*threadpool)->threadpool_update), (*threadpool)->pool))) {
219 DEBUG_ERR("error calling apr_thread_cond_create: %s", apr_strerror(status, errbuf, 128));
220 return status;
221 }
222 (*threadpool)->thread = apr_palloc((*threadpool)->pool, nb_thread * sizeof(apr_thread_mutex_t *));
223 (*threadpool)->ctx = ctx;
224 (*threadpool)->nb_thread = nb_thread;
225 (*threadpool)->nb_waiting = 0UL;
226 (*threadpool)->list = napr_list_make((*threadpool)->pool);
227 (*threadpool)->process_data = process_data;
228 (*threadpool)->run &= 0x0;
229 (*threadpool)->ended &= 0x0;
230 (*threadpool)->shutdown &= 0x0;
231
232 for (l = 0; l < nb_thread; l++) {
233 if (APR_SUCCESS !=
234 (status =
235 apr_thread_create(&((*threadpool)->thread[l]), NULL, napr_threadpool_loop, (*threadpool),
236 (*threadpool)->pool))) {
237 DEBUG_ERR("error calling apr_thread_create: %s", apr_strerror(status, errbuf, 128));
238 return status;
239 }
240 }
241
242 return APR_SUCCESS;
243}
244
245extern apr_status_t napr_threadpool_add(napr_threadpool_t *threadpool, void *data)
246{
247 char errbuf[128];
248 apr_status_t status;
249
250 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
251 DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
252 return status;
253 }
254 threadpool->ended &= 0x0;
255 napr_list_enqueue(threadpool->list, data);
256 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
257 DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
258 return status;
259 }
260 if (APR_SUCCESS != (status = apr_thread_cond_signal(threadpool->threadpool_update))) {
261 DEBUG_ERR("error calling apr_thread_cond_signal: %s", apr_strerror(status, errbuf, 128));
262 return status;
263 }
264
265 return APR_SUCCESS;
266}
267
268extern apr_status_t napr_threadpool_wait(napr_threadpool_t *threadpool)
269{
270 char errbuf[128];
271 apr_status_t status;
272 int list_size = 0;
273
274 /* DEBUG_DBG("Called"); */
275 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
276 DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
277 return status;
278 }
279 list_size = napr_list_size(threadpool->list);
280 if ((0 != list_size) || (threadpool->nb_waiting != threadpool->nb_thread)) {
281 /* DEBUG_DBG("After lock before wait"); */
282 /*
283 * Because the caller of this function has added all data we are now
284 * running, the line after, (cond_wait), make us wait to the end of
285 * processing.
286 */
287 threadpool->run |= 0x1;
288
289 if (APR_SUCCESS != (status = apr_thread_cond_wait(threadpool->threadpool_update, threadpool->threadpool_mutex))) {
290 DEBUG_ERR("error calling apr_thread_cond_wait: %s", apr_strerror(status, errbuf, 128));
291 return status;
292 }
293 /* DEBUG_DBG("Awake"); */
294 /* Because all data have been processed, we are no more running */
295 threadpool->run &= 0x0;
296 }
297 /*
298 * garbage collecting under lock protection to avoid list manipulation
299 * during this freeing.
300 * We are doing this garbage collecting because if not, we will re-using
301 * the same pool to very-often alloc cell_t structures in list.
302 * This could lead to a memory leak if not regularly cleaned.
303 */
304 napr_list_delete(threadpool->list);
305 threadpool->list = napr_list_make(threadpool->pool);
306 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
307 DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
308 return status;
309 }
310
311 return APR_SUCCESS;
312}
313
314static void *APR_THREAD_FUNC napr_threadpool_loop(apr_thread_t *thd, void *rec)
315{
316 char errbuf[128];
317 napr_threadpool_t *threadpool = rec;
318 apr_status_t status;
319
320 /* lock the mutex, to access the list exclusively. */
321 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
322 DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
323 return NULL;
324 }
325
326 /* do forever.... (unless shutdown is requested) */
327 while (!(threadpool->shutdown & 0x1)) {
328 /* DEBUG_DBG("list_size: %lu", napr_list_size(threadpool->list)); */
329 if ((0 < napr_list_size(threadpool->list)) && !(threadpool->ended & 0x1)) {
330 napr_cell_t *cell;
331 void *data;
332
333 cell = napr_list_first(threadpool->list);
334 data = napr_list_get(cell);
335 napr_list_cdr(threadpool->list);
336 if (data) {
337 /*
338 * unlock mutex - because other threads would be able to handle
339 * other data waiting in the queue paralelly.
340 */
341 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
342 DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
343 return NULL;
344 }
345 threadpool->process_data(threadpool->ctx, data);
346 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
347 DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
348 return NULL;
349 }
350 }
351 }
352 else { /* The waiting else */
353 threadpool->nb_waiting += 1UL;
354
355 /* DEBUG_DBG("run: %i waiting: %lu / thread: %lu", (threadpool->run & 0x1) ? 1 : 0, threadpool->nb_waiting, threadpool->nb_thread); */
356 /* Should not broadcast if it's already ended, because it may lead to an infinite loop */
357 if (!(threadpool->ended & 0x1) && (threadpool->run & 0x1) && (threadpool->nb_waiting == threadpool->nb_thread)) {
358 threadpool->ended |= 0x1;
359 if (APR_SUCCESS != (status = apr_thread_cond_broadcast(threadpool->threadpool_update))) {
360 DEBUG_ERR("error calling apr_thread_cond_signal: %s", apr_strerror(status, errbuf, 128));
361 return NULL;
362 }
363 }
364
365 /*
366 * wait for a new data. note the mutex will be unlocked in
367 * apr_thread_cond_wait(), thus allowing other threads access to data
368 * list.
369 */
370 if (APR_SUCCESS != (status = apr_thread_cond_wait(threadpool->threadpool_update, threadpool->threadpool_mutex))) {
371 DEBUG_ERR("error calling apr_thread_cond_wait: %s", apr_strerror(status, errbuf, 128));
372 return NULL;
373 }
374 /*
375 * after we return from apr_thread_cond_wait, the mutex is locked
376 * again, so we don't need to lock it ourselves
377 */
378 threadpool->nb_waiting -= 1UL;
379 }
380 }
381
382 /* Unlock mutex before exiting thread */
383 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
384 DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
385 }
386
387 return NULL;
388}
389
390extern apr_status_t napr_threadpool_shutdown(napr_threadpool_t *threadpool)
391{
392 char errbuf[128];
393 apr_status_t status, rv;
394 unsigned long l;
395
396 /* Lock mutex to set shutdown flag */
397 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
398 DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
399 return status;
400 }
401
402 /* Signal all threads to shutdown */
403 threadpool->shutdown |= 0x1;
404
405 /* Broadcast to wake up all waiting threads */
406 if (APR_SUCCESS != (status = apr_thread_cond_broadcast(threadpool->threadpool_update))) {
407 DEBUG_ERR("error calling apr_thread_cond_broadcast: %s", apr_strerror(status, errbuf, 128));
408 apr_thread_mutex_unlock(threadpool->threadpool_mutex);
409 return status;
410 }
411
412 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
413 DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
414 return status;
415 }
416
417 /* Join all threads to wait for them to exit */
418 for (l = 0; l < threadpool->nb_thread; l++) {
419 if (APR_SUCCESS != (status = apr_thread_join(&rv, threadpool->thread[l]))) {
420 DEBUG_ERR("error calling apr_thread_join: %s", apr_strerror(status, errbuf, 128));
421 return status;
422 }
423 }
424
425 return APR_SUCCESS;
426}
UTIL debug output macros.
#define DEBUG_ERR(str, arg...)
Display error message at the level error.
Definition debug.h:31
apr_status_t napr_threadpool_shutdown(napr_threadpool_t *threadpool)
Shuts down the thread pool.
apr_status_t napr_threadpool_wait(napr_threadpool_t *threadpool)
Waits until all tasks currently in the queue have been processed.
static int napr_list_cons(napr_list_t *napr_list, void *element)
apr_status_t napr_threadpool_init(napr_threadpool_t **threadpool, void *ctx, unsigned long nb_thread, threadpool_process_data_callback_fn_t *process_data, apr_pool_t *pool)
Initializes a thread pool.
apr_status_t napr_threadpool_add(napr_threadpool_t *threadpool, void *data)
Adds a task (a data item) to the thread pool's processing queue.
A simple fixed-size thread pool for concurrent task processing.
struct napr_threadpool_t napr_threadpool_t
Opaque thread pool structure.
apr_status_t() threadpool_process_data_callback_fn_t(void *ctx, void *data)
The worker function executed by threads in the pool.