LCOV - code coverage report
Current view: top level - src - napr_threadpool.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 73.4 % 154 113
Test Date: 2025-10-15 21:43:52 Functions: 100.0 % 13 13

            Line data    Source code
       1              : /**
       2              :  * @file napr_threadpool.c
       3              :  * @brief Implementation of the fixed-size thread pool.
       4              :  * @ingroup DataStructures
       5              :  */
       6              : /*
       7              :  * Copyright (C) 2007 François Pesce : francois.pesce (at) gmail (dot) com
       8              :  *
       9              :  * Licensed under the Apache License, Version 2.0 (the "License");
      10              :  * you may not use this file except in compliance with the License.
      11              :  * You may obtain a copy of the License at
      12              :  *
      13              :  *      http://www.apache.org/licenses/LICENSE-2.0
      14              :  *
      15              :  * Unless required by applicable law or agreed to in writing, software
      16              :  * distributed under the License is distributed on an "AS IS" BASIS,
      17              :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      18              :  * See the License for the specific language governing permissions and
      19              :  * limitations under the License.
      20              :  */
      21              : 
      22              : #include <stdlib.h>
      23              : #include <stdio.h>
      24              : 
      25              : #include <apr_thread_proc.h>
      26              : #include <apr_thread_mutex.h>
      27              : #include <apr_thread_cond.h>
      28              : 
      29              : #include "napr_threadpool.h"
      30              : #include "debug.h"
      31              : 
      32              : /* Definitions of some basic list methods, see further to find pure threadpool stuff */
      33              : typedef struct napr_cell_t napr_cell_t;
      34              : 
      35              : struct napr_cell_t
      36              : {
      37              :     napr_cell_t *next;
      38              :     void *data;
      39              : };
      40              : 
      41              : typedef struct napr_list_t napr_list_t;
      42              : 
      43              : struct napr_list_t
      44              : {
      45              :     apr_pool_t *p;
      46              :     napr_cell_t *head;
      47              :     napr_cell_t *tail;
      48              :     unsigned long nb_cells;
      49              : };
      50              : 
      51           22 : static inline napr_list_t *napr_list_make(apr_pool_t *p)
      52              : {
      53              :     apr_pool_t *local_pool;
      54              :     napr_list_t *napr_list;
      55              : 
      56           22 :     apr_pool_create(&local_pool, p);
      57           22 :     napr_list = apr_palloc(local_pool, sizeof(struct napr_list_t));
      58              : 
      59           22 :     if (napr_list != NULL) {
      60           22 :         napr_list->p = local_pool;
      61           22 :         napr_list->head = NULL;
      62           22 :         napr_list->tail = NULL;
      63           22 :         napr_list->nb_cells = 0UL;
      64              :     }
      65              : 
      66           22 :     return napr_list;
      67              : }
      68              : 
      69           11 : static int napr_list_cons(napr_list_t *napr_list, void *element)
      70              : {
      71              :     napr_cell_t *cell;
      72              :     int rc; /** return code. */
      73              : 
      74           11 :     if (NULL != (cell = (napr_cell_t *) apr_palloc(napr_list->p, sizeof(struct napr_cell_t)))) {
      75           11 :         cell->data = element;
      76           11 :         cell->next = napr_list->head;
      77           11 :         napr_list->head = cell;
      78           11 :         napr_list->nb_cells += 1;
      79              : 
      80           11 :         if (napr_list->nb_cells == 1)
      81           11 :             napr_list->tail = napr_list->head;
      82              : 
      83           11 :         rc = 0;
      84              :     }
      85              :     else {
      86            0 :         rc = -1;
      87              :     }
      88              : 
      89           11 :     return rc;
      90              : }
      91              : 
      92           93 : static inline void napr_list_cdr(napr_list_t *napr_list)
      93              : {
      94              :     void *cell;
      95              : 
      96           93 :     cell = napr_list->head->next;
      97           93 :     napr_list->nb_cells -= 1UL;
      98           93 :     napr_list->head = cell;
      99           93 : }
     100              : 
     101           11 : static inline void napr_list_delete(napr_list_t *napr_list)
     102              : {
     103           11 :     while (napr_list->head != NULL)
     104            0 :         napr_list_cdr(napr_list);
     105              : 
     106           11 :     apr_pool_destroy(napr_list->p);
     107           11 : }
     108              : 
     109           93 : static inline int napr_list_enqueue(napr_list_t *napr_list, void *element)
     110              : {
     111              :     napr_cell_t *cell;
     112           93 :     int rc = 0;
     113              : 
     114           93 :     if (0 != napr_list->nb_cells) {
     115           82 :         if (NULL != (cell = (napr_cell_t *) apr_palloc(napr_list->p, sizeof(struct napr_cell_t)))) {
     116           82 :             napr_list->nb_cells += 1UL;
     117           82 :             cell->data = element;
     118           82 :             cell->next = NULL;
     119           82 :             napr_list->tail->next = cell;
     120           82 :             napr_list->tail = cell;
     121              :         }
     122              :         else
     123            0 :             rc = -1;
     124              :     }
     125              :     else {
     126           11 :         rc = napr_list_cons(napr_list, element);
     127              :     }
     128              : 
     129           93 :     return rc;
     130              : }
     131              : 
     132           93 : static inline napr_cell_t *napr_list_first(napr_list_t *napr_list)
     133              : {
     134           93 :     return napr_list->head;
     135              : }
     136              : 
     137          223 : static inline unsigned long napr_list_size(napr_list_t *napr_list)
     138              : {
     139          223 :     return napr_list->nb_cells;
     140              : }
     141              : 
     142              : #if 0
     143              : static inline napr_cell_t *napr_list_next(napr_cell_t *cell)
     144              : {
     145              :     return cell->next;
     146              : }
     147              : #endif
     148              : 
     149           93 : static inline void *napr_list_get(napr_cell_t *cell)
     150              : {
     151           93 :     return cell->data;
     152              : }
     153              : 
     154              : /* The threadpool structures and engine */
     155              : struct napr_threadpool_t
     156              : {
     157              :     apr_thread_t **thread;
     158              : 
     159              :     void *ctx;
     160              :     /* This mutex protects everything below in writing and reading */
     161              :     apr_thread_mutex_t *threadpool_mutex;
     162              :     apr_thread_cond_t *threadpool_update;
     163              :     unsigned long nb_thread;
     164              :     unsigned long nb_waiting;
     165              :     napr_list_t *list;
     166              :     threadpool_process_data_callback_fn_t *process_data;
     167              :     apr_pool_t *pool;
     168              : 
     169              :     /*
     170              :      * run == 0x1 means we are currently processing data
     171              :      * run == 0x0 means we are creating the pool, OR (exclusive) we were
     172              :      *        processing data and encountered a end of list and all thread are
     173              :      *        waiting (no more data).
     174              :      * Algorithm : 
     175              :      * run &= 0x0;
     176              :      *     create the thread pool loops :
     177              :      *         code in the loop say :
     178              :      *         run at 0x0, we can be nb_thread == nb_waiting in a "wait" mode.
     179              :      *         This doesn't matter.
     180              :      *
     181              :      * external caller function add all data to process, then call the napr_threadpool_wait:
     182              :      *     napr_threadpool_add_data:
     183              :      *         fill list.
     184              :      *     napr_threadpool_wait:
     185              :      *         run |= 0x1;
     186              :      *
     187              :      *         code in the loop say :
     188              :      *         run at 0x1, if we are at nb_thread == nb_waiting I'd better warn
     189              :      *         the caller of the napr_threadpool_wait function that the processing
     190              :      *         is ended.
     191              :      */
     192              : 
     193              :     unsigned int run:1;
     194              :     unsigned int ended:1;
     195              :     unsigned int shutdown:1;
     196              : };
     197              : 
     198              : static void *APR_THREAD_FUNC napr_threadpool_loop(apr_thread_t *thd, void *rec);
     199              : 
     200           11 : extern apr_status_t napr_threadpool_init(napr_threadpool_t **threadpool, void *ctx, unsigned long nb_thread,
     201              :                                          threadpool_process_data_callback_fn_t *process_data, apr_pool_t *pool)
     202              : {
     203              :     char errbuf[128];
     204              :     apr_pool_t *local_pool;
     205              :     unsigned long l;
     206              :     apr_status_t status;
     207              : 
     208           11 :     apr_pool_create(&local_pool, pool);
     209           11 :     (*threadpool) = apr_palloc(local_pool, sizeof(struct napr_threadpool_t));
     210           11 :     (*threadpool)->pool = local_pool;
     211              : 
     212           11 :     if (APR_SUCCESS !=
     213              :         (status =
     214           11 :          apr_thread_mutex_create(&((*threadpool)->threadpool_mutex), APR_THREAD_MUTEX_DEFAULT, (*threadpool)->pool))) {
     215            0 :         DEBUG_ERR("error calling apr_thread_mutex_create: %s", apr_strerror(status, errbuf, 128));
     216            0 :         return status;
     217              :     }
     218           11 :     if (APR_SUCCESS != (status = apr_thread_cond_create(&((*threadpool)->threadpool_update), (*threadpool)->pool))) {
     219            0 :         DEBUG_ERR("error calling apr_thread_cond_create: %s", apr_strerror(status, errbuf, 128));
     220            0 :         return status;
     221              :     }
     222           11 :     (*threadpool)->thread = apr_palloc((*threadpool)->pool, nb_thread * sizeof(apr_thread_mutex_t *));
     223           11 :     (*threadpool)->ctx = ctx;
     224           11 :     (*threadpool)->nb_thread = nb_thread;
     225           11 :     (*threadpool)->nb_waiting = 0UL;
     226           11 :     (*threadpool)->list = napr_list_make((*threadpool)->pool);
     227           11 :     (*threadpool)->process_data = process_data;
     228           11 :     (*threadpool)->run &= 0x0;
     229           11 :     (*threadpool)->ended &= 0x0;
     230           11 :     (*threadpool)->shutdown &= 0x0;
     231              : 
     232           91 :     for (l = 0; l < nb_thread; l++) {
     233           80 :         if (APR_SUCCESS !=
     234              :             (status =
     235           80 :              apr_thread_create(&((*threadpool)->thread[l]), NULL, napr_threadpool_loop, (*threadpool),
     236           80 :                                (*threadpool)->pool))) {
     237            0 :             DEBUG_ERR("error calling apr_thread_create: %s", apr_strerror(status, errbuf, 128));
     238            0 :             return status;
     239              :         }
     240              :     }
     241              : 
     242           11 :     return APR_SUCCESS;
     243              : }
     244              : 
     245           93 : extern apr_status_t napr_threadpool_add(napr_threadpool_t *threadpool, void *data)
     246              : {
     247              :     char errbuf[128];
     248              :     apr_status_t status;
     249              : 
     250           93 :     if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
     251            0 :         DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
     252            0 :         return status;
     253              :     }
     254           93 :     threadpool->ended &= 0x0;
     255           93 :     napr_list_enqueue(threadpool->list, data);
     256           93 :     if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
     257            0 :         DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
     258            0 :         return status;
     259              :     }
     260           93 :     if (APR_SUCCESS != (status = apr_thread_cond_signal(threadpool->threadpool_update))) {
     261            0 :         DEBUG_ERR("error calling apr_thread_cond_signal: %s", apr_strerror(status, errbuf, 128));
     262            0 :         return status;
     263              :     }
     264              : 
     265           93 :     return APR_SUCCESS;
     266              : }
     267              : 
     268           11 : extern apr_status_t napr_threadpool_wait(napr_threadpool_t *threadpool)
     269              : {
     270              :     char errbuf[128];
     271              :     apr_status_t status;
     272           11 :     int list_size = 0;
     273              : 
     274              :     /* DEBUG_DBG("Called"); */
     275           11 :     if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
     276            0 :         DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
     277            0 :         return status;
     278              :     }
     279           11 :     list_size = napr_list_size(threadpool->list);
     280           11 :     if ((0 != list_size) || (threadpool->nb_waiting != threadpool->nb_thread)) {
     281              :         /* DEBUG_DBG("After lock before wait"); */
     282              :         /*
     283              :          * Because the caller of this function has added all data we are now
     284              :          * running, the line after, (cond_wait), make us wait to the end of
     285              :          * processing.
     286              :          */
     287           11 :         threadpool->run |= 0x1;
     288              : 
     289           11 :         if (APR_SUCCESS != (status = apr_thread_cond_wait(threadpool->threadpool_update, threadpool->threadpool_mutex))) {
     290            0 :             DEBUG_ERR("error calling apr_thread_cond_wait: %s", apr_strerror(status, errbuf, 128));
     291            0 :             return status;
     292              :         }
     293              :         /* DEBUG_DBG("Awake"); */
     294              :         /* Because all data have been processed, we are no more running  */
     295           11 :         threadpool->run &= 0x0;
     296              :     }
     297              :     /*
     298              :      * garbage collecting under lock protection to avoid list manipulation
     299              :      * during this freeing.
     300              :      * We are doing this garbage collecting because if not, we will re-using
     301              :      * the same pool to very-often alloc cell_t structures in list.
     302              :      * This could lead to a memory leak if not regularly cleaned.
     303              :      */
     304           11 :     napr_list_delete(threadpool->list);
     305           11 :     threadpool->list = napr_list_make(threadpool->pool);
     306           11 :     if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
     307            0 :         DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
     308            0 :         return status;
     309              :     }
     310              : 
     311           11 :     return APR_SUCCESS;
     312              : }
     313              : 
     314           80 : static void *APR_THREAD_FUNC napr_threadpool_loop(apr_thread_t *thd, void *rec)
     315              : {
     316              :     char errbuf[128];
     317           80 :     napr_threadpool_t *threadpool = rec;
     318              :     apr_status_t status;
     319              : 
     320              :     /* lock the mutex, to access the list exclusively. */
     321           80 :     if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
     322            0 :         DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
     323            0 :         return NULL;
     324              :     }
     325              : 
     326              :     /* do forever.... (unless shutdown is requested) */
     327          292 :     while (!(threadpool->shutdown & 0x1)) {
     328              :         /* DEBUG_DBG("list_size: %lu", napr_list_size(threadpool->list)); */
     329          305 :         if ((0 < napr_list_size(threadpool->list)) && !(threadpool->ended & 0x1)) {
     330              :             napr_cell_t *cell;
     331              :             void *data;
     332              : 
     333           93 :             cell = napr_list_first(threadpool->list);
     334           93 :             data = napr_list_get(cell);
     335           93 :             napr_list_cdr(threadpool->list);
     336           93 :             if (data) {
     337              :                 /*
     338              :                  * unlock mutex - because other threads would be able to handle
     339              :                  * other data waiting in the queue paralelly.
     340              :                  */
     341           93 :                 if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
     342            0 :                     DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
     343            0 :                     return NULL;
     344              :                 }
     345           93 :                 threadpool->process_data(threadpool->ctx, data);
     346           93 :                 if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
     347            0 :                     DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
     348            0 :                     return NULL;
     349              :                 }
     350              :             }
     351              :         }
     352              :         else {                  /* The waiting else */
     353          119 :             threadpool->nb_waiting += 1UL;
     354              : 
     355              :             /* DEBUG_DBG("run: %i waiting: %lu / thread: %lu", (threadpool->run & 0x1) ? 1 : 0, threadpool->nb_waiting, threadpool->nb_thread); */
     356              :             /* Should not broadcast if it's already ended, because it may lead to an infinite loop */
     357          119 :             if (!(threadpool->ended & 0x1) && (threadpool->run & 0x1) && (threadpool->nb_waiting == threadpool->nb_thread)) {
     358           11 :                 threadpool->ended |= 0x1;
     359           11 :                 if (APR_SUCCESS != (status = apr_thread_cond_broadcast(threadpool->threadpool_update))) {
     360            0 :                     DEBUG_ERR("error calling apr_thread_cond_signal: %s", apr_strerror(status, errbuf, 128));
     361            0 :                     return NULL;
     362              :                 }
     363              :             }
     364              : 
     365              :             /*
     366              :              * wait for a new data. note the mutex will be unlocked in
     367              :              * apr_thread_cond_wait(), thus allowing other threads access to data
     368              :              * list.
     369              :              */
     370          119 :             if (APR_SUCCESS != (status = apr_thread_cond_wait(threadpool->threadpool_update, threadpool->threadpool_mutex))) {
     371            0 :                 DEBUG_ERR("error calling apr_thread_cond_wait: %s", apr_strerror(status, errbuf, 128));
     372            0 :                 return NULL;
     373              :             }
     374              :             /*
     375              :              * after we return from apr_thread_cond_wait, the mutex is locked
     376              :              * again, so we don't need to lock it ourselves
     377              :              */
     378          119 :             threadpool->nb_waiting -= 1UL;
     379              :         }
     380              :     }
     381              : 
     382              :     /* Unlock mutex before exiting thread */
     383           80 :     if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
     384            0 :         DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
     385              :     }
     386              : 
     387           80 :     return NULL;
     388              : }
     389              : 
     390           11 : extern apr_status_t napr_threadpool_shutdown(napr_threadpool_t *threadpool)
     391              : {
     392              :     char errbuf[128];
     393              :     apr_status_t status, rv;
     394              :     unsigned long l;
     395              : 
     396              :     /* Lock mutex to set shutdown flag */
     397           11 :     if (APR_SUCCESS != (status = apr_thread_mutex_lock(threadpool->threadpool_mutex))) {
     398            0 :         DEBUG_ERR("error calling apr_thread_mutex_lock: %s", apr_strerror(status, errbuf, 128));
     399            0 :         return status;
     400              :     }
     401              : 
     402              :     /* Signal all threads to shutdown */
     403           11 :     threadpool->shutdown |= 0x1;
     404              : 
     405              :     /* Broadcast to wake up all waiting threads */
     406           11 :     if (APR_SUCCESS != (status = apr_thread_cond_broadcast(threadpool->threadpool_update))) {
     407            0 :         DEBUG_ERR("error calling apr_thread_cond_broadcast: %s", apr_strerror(status, errbuf, 128));
     408            0 :         apr_thread_mutex_unlock(threadpool->threadpool_mutex);
     409            0 :         return status;
     410              :     }
     411              : 
     412           11 :     if (APR_SUCCESS != (status = apr_thread_mutex_unlock(threadpool->threadpool_mutex))) {
     413            0 :         DEBUG_ERR("error calling apr_thread_mutex_unlock: %s", apr_strerror(status, errbuf, 128));
     414            0 :         return status;
     415              :     }
     416              : 
     417              :     /* Join all threads to wait for them to exit */
     418           91 :     for (l = 0; l < threadpool->nb_thread; l++) {
     419           80 :         if (APR_SUCCESS != (status = apr_thread_join(&rv, threadpool->thread[l]))) {
     420            0 :             DEBUG_ERR("error calling apr_thread_join: %s", apr_strerror(status, errbuf, 128));
     421            0 :             return status;
     422              :         }
     423              :     }
     424              : 
     425           11 :     return APR_SUCCESS;
     426              : }
        

Generated by: LCOV version 2.0-1