Line data Source code
1 : /**
2 : * @file napr_threadpool.c
3 : * @brief Implementation of the fixed-size thread pool.
4 : * @ingroup DataStructures
5 : */
6 : /*
7 : * Copyright (C) 2007 François Pesce : francois.pesce (at) gmail (dot) com
8 : *
9 : * Licensed under the Apache License, Version 2.0 (the "License");
10 : * you may not use this file except in compliance with the License.
11 : * You may obtain a copy of the License at
12 : *
13 : * http://www.apache.org/licenses/LICENSE-2.0
14 : *
15 : * Unless required by applicable law or agreed to in writing, software
16 : * distributed under the License is distributed on an "AS IS" BASIS,
17 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 : * See the License for the specific language governing permissions and
19 : * limitations under the License.
20 : */
21 :
22 : #include <stdlib.h>
23 : #include <stdio.h>
24 :
25 : #include <apr_thread_proc.h>
26 : #include <apr_thread_mutex.h>
27 : #include <apr_thread_cond.h>
28 :
29 : #include "napr_threadpool.h"
30 : #include "debug.h"
31 :
32 : /* Definitions of some basic list methods, see further to find pure threadpool stuff */
33 : typedef struct napr_cell_t napr_cell_t;
34 :
35 : struct napr_cell_t
36 : {
37 : napr_cell_t *next;
38 : void *data;
39 : };
40 :
41 : typedef struct napr_list_t napr_list_t;
42 :
43 : struct napr_list_t
44 : {
45 : apr_pool_t *p;
46 : napr_cell_t *head;
47 : napr_cell_t *tail;
48 : unsigned long nb_cells;
49 : };
50 :
51 22 : static inline napr_list_t *napr_list_make(apr_pool_t *p)
52 : {
53 : apr_pool_t *local_pool;
54 : napr_list_t *napr_list;
55 :
56 22 : apr_pool_create(&local_pool, p);
57 22 : napr_list = apr_palloc(local_pool, sizeof(struct napr_list_t));
58 :
59 22 : if (napr_list != NULL) {
60 22 : napr_list->p = local_pool;
61 22 : napr_list->head = NULL;
62 22 : napr_list->tail = NULL;
63 22 : napr_list->nb_cells = 0UL;
64 : }
65 :
66 22 : return napr_list;
67 : }
68 :
69 11 : static int napr_list_cons(napr_list_t *napr_list, void *element)
70 : {
71 : napr_cell_t *cell;
72 : int rc; /** return code. */
73 :
74 11 : if (NULL != (cell = (napr_cell_t *) apr_palloc(napr_list->p, sizeof(struct napr_cell_t)))) {
75 11 : cell->data = element;
76 11 : cell->next = napr_list->head;
77 11 : napr_list->head = cell;
78 11 : napr_list->nb_cells += 1;
79 :
80 11 : if (napr_list->nb_cells == 1)
81 11 : napr_list->tail = napr_list->head;
82 :
83 11 : rc = 0;
84 : }
85 : else {
86 0 : rc = -1;
87 : }
88 :
89 11 : return rc;
90 : }
91 :
92 93 : static inline void napr_list_cdr(napr_list_t *napr_list)
93 : {
94 : void *cell;
95 :
96 93 : cell = napr_list->head->next;
97 93 : napr_list->nb_cells -= 1UL;
98 93 : napr_list->head = cell;
99 93 : }
100 :
101 11 : static inline void napr_list_delete(napr_list_t *napr_list)
102 : {
103 11 : while (napr_list->head != NULL)
104 0 : napr_list_cdr(napr_list);
105 :
106 11 : apr_pool_destroy(napr_list->p);
107 11 : }
108 :
109 93 : static inline int napr_list_enqueue(napr_list_t *napr_list, void *element)
110 : {
111 : napr_cell_t *cell;
112 93 : int rc = 0;
113 :
114 93 : if (0 != napr_list->nb_cells) {
115 82 : if (NULL != (cell = (napr_cell_t *) apr_palloc(napr_list->p, sizeof(struct napr_cell_t)))) {
116 82 : napr_list->nb_cells += 1UL;
117 82 : cell->data = element;
118 82 : cell->next = NULL;
119 82 : napr_list->tail->next = cell;
120 82 : napr_list->tail = cell;
121 : }
122 : else
123 0 : rc = -1;
124 : }
125 : else {
126 11 : rc = napr_list_cons(napr_list, element);
127 : }
128 :
129 93 : return rc;
130 : }
131 :
132 93 : static inline napr_cell_t *napr_list_first(napr_list_t *napr_list)
133 : {
134 93 : return napr_list->head;
135 : }
136 :
137 223 : static inline unsigned long napr_list_size(napr_list_t *napr_list)
138 : {
139 223 : return napr_list->nb_cells;
140 : }
141 :
142 : #if 0
143 : static inline napr_cell_t *napr_list_next(napr_cell_t *cell)
144 : {
145 : return cell->next;
146 : }
147 : #endif
148 :
149 93 : static inline void *napr_list_get(napr_cell_t *cell)
150 : {
151 93 : return cell->data;
152 : }
153 :
154 : /* The threadpool structures and engine */
155 : struct napr_threadpool_t
156 : {
157 : apr_thread_t **thread;
158 :
159 : void *ctx;
160 : /* This mutex protects everything below in writing and reading */
161 : apr_thread_mutex_t *threadpool_mutex;
162 : apr_thread_cond_t *threadpool_update;
163 : unsigned long nb_thread;
164 : unsigned long nb_waiting;
165 : napr_list_t *list;
166 : threadpool_process_data_callback_fn_t *process_data;
167 : apr_pool_t *pool;
168 :
169 : /*
170 : * run == 0x1 means we are currently processing data
171 : * run == 0x0 means we are creating the pool, OR (exclusive) we were
172 : * processing data and encountered a end of list and all thread are
173 : * waiting (no more data).
174 : * Algorithm :
175 : * run &= 0x0;
176 : * create the thread pool loops :
177 : * code in the loop say :
178 : * run at 0x0, we can be nb_thread == nb_waiting in a "wait" mode.
179 : * This doesn't matter.
180 : *
181 : * external caller function add all data to process, then call the napr_threadpool_wait:
182 : * napr_threadpool_add_data:
183 : * fill list.
184 : * napr_threadpool_wait:
185 : * run |= 0x1;
186 : *
187 : * code in the loop say :
188 : * run at 0x1, if we are at nb_thread == nb_waiting I'd better warn
189 : * the caller of the napr_threadpool_wait function that the processing
190 : * is ended.
191 : */
192 :
193 : unsigned int run:1;
194 : unsigned int ended:1;
195 : unsigned int shutdown:1;
196 : };
197 :
198 : static void *APR_THREAD_FUNC napr_threadpool_loop(apr_thread_t *thd, void *rec);
199 :
200 11 : extern apr_status_t napr_threadpool_init(napr_threadpool_t **threadpool, void *ctx, unsigned long nb_thread,
201 : threadpool_process_data_callback_fn_t *process_data, apr_pool_t *pool)
202 : {
203 : char errbuf[128];
204 : apr_pool_t *local_pool;
205 : unsigned long l;
206 : apr_status_t status;
207 :
208 11 : apr_pool_create(&local_pool, pool);
209 11 : (*threadpool) = apr_palloc(local_pool, sizeof(struct napr_threadpool_t));
210 11 : (*threadpool)->pool = local_pool;
211 :
212 11 : if (APR_SUCCESS !=
213 : (status =
214 11 : apr_thread_mutex_create(&((*threadpool)->threadpool_mutex), APR_THREAD_MUTEX_DEFAULT, (*threadpool)->pool))) {
215 0 : DEBUG_ERR("error calling apr_thread_mutex_create: %s", apr_strerror(status, errbuf, 128));
216 0 : return status;
217 : }
218 11 : if (APR_SUCCESS != (status = apr_thread_cond_create(&((*threadpool)->threadpool_update), (*threadpool)->pool))) {
219 0 : DEBUG_ERR("error calling apr_thread_cond_create: %s", apr_strerror(status, errbuf, 128));
220 0 : return status;
221 : }
222 11 : (*threadpool)->thread = apr_palloc((*threadpool)->pool, nb_thread * sizeof(apr_thread_mutex_t *));
223 11 : (*threadpool)->ctx = ctx;
224 11 : (*threadpool)->nb_thread = nb_thread;
225 11 : (*threadpool)->nb_waiting = 0UL;
226 11 : (*threadpool)->list = napr_list_make((*threadpool)->pool);
227 11 : (*threadpool)->process_data = process_data;
228 11 : (*threadpool)->run &= 0x0;
229 11 : (*threadpool)->ended &= 0x0;
230 11 : (*threadpool)->shutdown &= 0x0;
231 :
232 91 : for (l = 0; l < nb_thread; l++) {
233 80 : if (APR_SUCCESS !=
234 : (status =
235 80 : apr_thread_create(&((*threadpool)->thread[l]), NULL, napr_threadpool_loop, (*threadpool),
236 80 : (*threadpool)->pool))) {
237 0 : DEBUG_ERR("error calling apr_thread_create: %s", apr_strerror(status, errbuf, 128));
238 0 : return status;
239 : }
240 : }
241 :
242 11 : return APR_SUCCESS;
243 : }
244 :
245 93 : extern apr_status_t napr_threadpool_add(napr_threadpool_t *threadpool, void *data)
246 : {
247 : char errbuf[128];
248 : apr_status_t status;
249 :
250 93 : if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
251 0 : DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
252 0 : return status;
253 : }
254 93 : threadpool->ended &= 0x0;
255 93 : napr_list_enqueue(threadpool->list, data);
256 93 : if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
257 0 : DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
258 0 : return status;
259 : }
260 93 : if (APR_SUCCESS != (status = apr_thread_cond_signal(threadpool->threadpool_update))) {
261 0 : DEBUG_ERR("error calling apr_thread_cond_signal: %s", apr_strerror(status, errbuf, 128));
262 0 : return status;
263 : }
264 :
265 93 : return APR_SUCCESS;
266 : }
267 :
268 11 : extern apr_status_t napr_threadpool_wait(napr_threadpool_t *threadpool)
269 : {
270 : char errbuf[128];
271 : apr_status_t status;
272 11 : int list_size = 0;
273 :
274 : /* DEBUG_DBG("Called"); */
275 11 : if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
276 0 : DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
277 0 : return status;
278 : }
279 11 : list_size = napr_list_size(threadpool->list);
280 11 : if ((0 != list_size) || (threadpool->nb_waiting != threadpool->nb_thread)) {
281 : /* DEBUG_DBG("After lock before wait"); */
282 : /*
283 : * Because the caller of this function has added all data we are now
284 : * running, the line after, (cond_wait), make us wait to the end of
285 : * processing.
286 : */
287 11 : threadpool->run |= 0x1;
288 :
289 11 : if (APR_SUCCESS != (status = apr_thread_cond_wait(threadpool->threadpool_update, threadpool->threadpool_mutex))) {
290 0 : DEBUG_ERR("error calling apr_thread_cond_wait: %s", apr_strerror(status, errbuf, 128));
291 0 : return status;
292 : }
293 : /* DEBUG_DBG("Awake"); */
294 : /* Because all data have been processed, we are no more running */
295 11 : threadpool->run &= 0x0;
296 : }
297 : /*
298 : * garbage collecting under lock protection to avoid list manipulation
299 : * during this freeing.
300 : * We are doing this garbage collecting because if not, we will re-using
301 : * the same pool to very-often alloc cell_t structures in list.
302 : * This could lead to a memory leak if not regularly cleaned.
303 : */
304 11 : napr_list_delete(threadpool->list);
305 11 : threadpool->list = napr_list_make(threadpool->pool);
306 11 : if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
307 0 : DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
308 0 : return status;
309 : }
310 :
311 11 : return APR_SUCCESS;
312 : }
313 :
314 80 : static void *APR_THREAD_FUNC napr_threadpool_loop(apr_thread_t *thd, void *rec)
315 : {
316 : char errbuf[128];
317 80 : napr_threadpool_t *threadpool = rec;
318 : apr_status_t status;
319 :
320 : /* lock the mutex, to access the list exclusively. */
321 80 : if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
322 0 : DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
323 0 : return NULL;
324 : }
325 :
326 : /* do forever.... (unless shutdown is requested) */
327 292 : while (!(threadpool->shutdown & 0x1)) {
328 : /* DEBUG_DBG("list_size: %lu", napr_list_size(threadpool->list)); */
329 305 : if ((0 < napr_list_size(threadpool->list)) && !(threadpool->ended & 0x1)) {
330 : napr_cell_t *cell;
331 : void *data;
332 :
333 93 : cell = napr_list_first(threadpool->list);
334 93 : data = napr_list_get(cell);
335 93 : napr_list_cdr(threadpool->list);
336 93 : if (data) {
337 : /*
338 : * unlock mutex - because other threads would be able to handle
339 : * other data waiting in the queue paralelly.
340 : */
341 93 : if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
342 0 : DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
343 0 : return NULL;
344 : }
345 93 : threadpool->process_data(threadpool->ctx, data);
346 93 : if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
347 0 : DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
348 0 : return NULL;
349 : }
350 : }
351 : }
352 : else { /* The waiting else */
353 119 : threadpool->nb_waiting += 1UL;
354 :
355 : /* DEBUG_DBG("run: %i waiting: %lu / thread: %lu", (threadpool->run & 0x1) ? 1 : 0, threadpool->nb_waiting, threadpool->nb_thread); */
356 : /* Should not broadcast if it's already ended, because it may lead to an infinite loop */
357 119 : if (!(threadpool->ended & 0x1) && (threadpool->run & 0x1) && (threadpool->nb_waiting == threadpool->nb_thread)) {
358 11 : threadpool->ended |= 0x1;
359 11 : if (APR_SUCCESS != (status = apr_thread_cond_broadcast(threadpool->threadpool_update))) {
360 0 : DEBUG_ERR("error calling apr_thread_cond_signal: %s", apr_strerror(status, errbuf, 128));
361 0 : return NULL;
362 : }
363 : }
364 :
365 : /*
366 : * wait for a new data. note the mutex will be unlocked in
367 : * apr_thread_cond_wait(), thus allowing other threads access to data
368 : * list.
369 : */
370 119 : if (APR_SUCCESS != (status = apr_thread_cond_wait(threadpool->threadpool_update, threadpool->threadpool_mutex))) {
371 0 : DEBUG_ERR("error calling apr_thread_cond_wait: %s", apr_strerror(status, errbuf, 128));
372 0 : return NULL;
373 : }
374 : /*
375 : * after we return from apr_thread_cond_wait, the mutex is locked
376 : * again, so we don't need to lock it ourselves
377 : */
378 119 : threadpool->nb_waiting -= 1UL;
379 : }
380 : }
381 :
382 : /* Unlock mutex before exiting thread */
383 80 : if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
384 0 : DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
385 : }
386 :
387 80 : return NULL;
388 : }
389 :
390 11 : extern apr_status_t napr_threadpool_shutdown(napr_threadpool_t *threadpool)
391 : {
392 : char errbuf[128];
393 : apr_status_t status, rv;
394 : unsigned long l;
395 :
396 : /* Lock mutex to set shutdown flag */
397 11 : if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
398 0 : DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
399 0 : return status;
400 : }
401 :
402 : /* Signal all threads to shutdown */
403 11 : threadpool->shutdown |= 0x1;
404 :
405 : /* Broadcast to wake up all waiting threads */
406 11 : if (APR_SUCCESS != (status = apr_thread_cond_broadcast(threadpool->threadpool_update))) {
407 0 : DEBUG_ERR("error calling apr_thread_cond_broadcast: %s", apr_strerror(status, errbuf, 128));
408 0 : apr_thread_mutex_unlock(threadpool->threadpool_mutex);
409 0 : return status;
410 : }
411 :
412 11 : if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
413 0 : DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
414 0 : return status;
415 : }
416 :
417 : /* Join all threads to wait for them to exit */
418 91 : for (l = 0; l < threadpool->nb_thread; l++) {
419 80 : if (APR_SUCCESS != (status = apr_thread_join(&rv, threadpool->thread[l]))) {
420 0 : DEBUG_ERR("error calling apr_thread_join: %s", apr_strerror(status, errbuf, 128));
421 0 : return status;
422 : }
423 : }
424 :
425 11 : return APR_SUCCESS;
426 : }
|