1#include <apr_thread_mutex.h>
10#include <apr_file_io.h>
13int ft_file_cmp(
const void *param1,
const void *param2);
16#include "ft_archive.h"
18static apr_status_t hashing_worker_callback(
void *ctx,
void *data)
20 char errbuf[ERROR_BUFFER_SIZE];
23 ft_fsize_t *fsize = task->fsize;
24 ft_file_t *file = fsize->chksum_array[task->index].file;
25 apr_pool_t *subpool = NULL;
26 apr_status_t status = APR_SUCCESS;
27 char *filepath = NULL;
29 memset(errbuf, 0,
sizeof(errbuf));
30 status = apr_pool_create(&subpool, h_ctx->pool);
31 if (APR_SUCCESS != status) {
32 DEBUG_ERR(
"error calling apr_pool_create: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
36 if (is_option_set(h_ctx->conf->mask, OPTION_UNTAR) && (NULL != file->subpath)) {
37 filepath = ft_archive_untar_file(file, subpool);
38 if (NULL == filepath) {
39 DEBUG_ERR(
"error calling ft_archive_untar_file");
40 apr_pool_destroy(subpool);
45 filepath = file->path;
48 status =
checksum_file(filepath, file->size, h_ctx->conf->excess_size,
49 &fsize->chksum_array[task->index].hash_value, subpool);
51 if (is_option_set(h_ctx->conf->mask, OPTION_UNTAR) && (NULL != file->subpath)) {
52 (void) apr_file_remove(filepath, subpool);
55 if (APR_SUCCESS == status) {
56 apr_status_t lock_status = APR_SUCCESS;
58 lock_status = apr_thread_mutex_lock(h_ctx->stats_mutex);
59 if (APR_SUCCESS == lock_status) {
60 h_ctx->files_processed++;
62 if (is_option_set(h_ctx->conf->mask, OPTION_VERBO)) {
63 (void) fprintf(stderr,
"\rProgress [%" APR_SIZE_T_FMT
"/%" APR_SIZE_T_FMT
"] %d%% ",
64 h_ctx->files_processed, h_ctx->total_files,
65 (
int) ((float) h_ctx->files_processed / (
float) h_ctx->total_files * 100.0f));
68 apr_thread_mutex_unlock(h_ctx->stats_mutex);
72 if (is_option_set(h_ctx->conf->mask, OPTION_VERBO)) {
73 (void) fprintf(stderr,
"\nskipping %s because: %s\n", file->path,
74 apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
78 apr_pool_destroy(subpool);
84static apr_status_t categorize_files(
ft_conf_t *conf, napr_heap_t *tmp_heap, apr_size_t *total_hash_tasks);
85static apr_status_t dispatch_hashing_tasks(
ft_conf_t *conf, apr_pool_t *gc_pool, apr_size_t total_hash_tasks);
86static apr_status_t collect_hashing_results(
ft_conf_t *conf, napr_heap_t *tmp_heap, apr_pool_t *gc_pool);
88apr_status_t ft_process_files(
ft_conf_t *conf)
91 apr_pool_t *gc_pool = NULL;
92 apr_status_t status = APR_SUCCESS;
93 apr_size_t total_hash_tasks = 0;
95 if (is_option_set(conf->mask, OPTION_VERBO)) {
96 (void) fprintf(stderr,
"Referencing files and sizes:\n");
99 status = apr_pool_create(&gc_pool, conf->pool);
100 if (APR_SUCCESS != status) {
101 char errbuf[ERROR_BUFFER_SIZE];
102 DEBUG_ERR(
"error calling apr_pool_create: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
108 status = categorize_files(conf, tmp_heap, &total_hash_tasks);
109 if (status != APR_SUCCESS) {
110 apr_pool_destroy(gc_pool);
114 if (total_hash_tasks > 0) {
115 status = dispatch_hashing_tasks(conf, gc_pool, total_hash_tasks);
116 if (status != APR_SUCCESS) {
117 apr_pool_destroy(gc_pool);
121 status = collect_hashing_results(conf, tmp_heap, gc_pool);
122 if (status != APR_SUCCESS) {
123 apr_pool_destroy(gc_pool);
128 apr_pool_destroy(gc_pool);
129 conf->heap = tmp_heap;
134static apr_status_t categorize_files(
ft_conf_t *conf, napr_heap_t *tmp_heap, apr_size_t *total_hash_tasks)
138 apr_uint32_t hash_value;
141 fsize =
napr_hash_search(conf->sizes, &file->size,
sizeof(apr_off_t), &hash_value);
143 if (1 == fsize->nb_files) {
147 if (NULL == fsize->chksum_array)
148 fsize->chksum_array = apr_palloc(conf->pool, fsize->nb_files *
sizeof(
struct ft_chksum_t));
150 fsize->chksum_array[fsize->nb_checksumed].file = file;
152 if (((2 == fsize->nb_files) || (0 == fsize->val)) && !is_option_set(conf->mask, OPTION_JSON)) {
153 memset(&fsize->chksum_array[fsize->nb_checksumed].hash_value, 0,
sizeof(
ft_hash_t));
154 fsize->nb_checksumed++;
158 (*total_hash_tasks)++;
159 fsize->nb_checksumed++;
164 DEBUG_ERR(
"inconsistency error found, no size[%" APR_OFF_T_FMT
"] in hash for file %s", file->size, file->path);
171static apr_status_t dispatch_hashing_tasks(
ft_conf_t *conf, apr_pool_t *gc_pool, apr_size_t total_hash_tasks)
173 char errbuf[ERROR_BUFFER_SIZE];
180 h_ctx.pool = gc_pool;
181 h_ctx.files_processed = 0;
182 h_ctx.total_files = total_hash_tasks;
184 status = apr_thread_mutex_create(&h_ctx.stats_mutex, APR_THREAD_MUTEX_DEFAULT, gc_pool);
185 if (APR_SUCCESS != status) {
186 DEBUG_ERR(
"error calling apr_thread_mutex_create: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
190 status =
napr_threadpool_init(&threadpool, &h_ctx, conf->num_threads, hashing_worker_callback, gc_pool);
191 if (APR_SUCCESS != status) {
192 DEBUG_ERR(
"error calling napr_threadpool_init: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
193 apr_thread_mutex_destroy(h_ctx.stats_mutex);
197 for (napr_hash_index_t *hash_index =
napr_hash_first(gc_pool, conf->sizes); hash_index;
201 int should_hash = (fsize->nb_files > 2) && (0 != fsize->val);
202 if (is_option_set(conf->mask, OPTION_JSON)) {
203 should_hash = (fsize->nb_files >= 2);
207 for (apr_uint32_t idx = 0; idx < fsize->nb_files; idx++) {
208 if (NULL != fsize->chksum_array[idx].file) {
214 if (APR_SUCCESS != status) {
215 DEBUG_ERR(
"error calling napr_threadpool_add: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
217 apr_thread_mutex_destroy(h_ctx.stats_mutex);
227 status = apr_thread_mutex_destroy(h_ctx.stats_mutex);
228 if (APR_SUCCESS != status) {
229 DEBUG_ERR(
"error calling apr_thread_mutex_destroy: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
232 if (is_option_set(conf->mask, OPTION_VERBO)) {
233 (void) fprintf(stderr,
"\n");
237 if (APR_SUCCESS != status) {
238 DEBUG_ERR(
"error calling napr_threadpool_shutdown: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
244static apr_status_t collect_hashing_results(
ft_conf_t *conf, napr_heap_t *tmp_heap, apr_pool_t *gc_pool)
248 for (napr_hash_index_t *hash_index =
napr_hash_first(gc_pool, conf->sizes); hash_index;
252 int should_insert = (fsize->nb_files > 2) && (0 != fsize->val);
253 if (is_option_set(conf->mask, OPTION_JSON)) {
254 should_insert = (fsize->nb_files >= 2);
258 for (apr_uint32_t idx = 0; idx < fsize->nb_files; idx++) {
259 if (NULL != fsize->chksum_array[idx].file) {
UTIL debug output macros.
#define DEBUG_ERR(str, arg...)
Display error message at the level error.
apr_status_t checksum_file(const char *filename, apr_off_t size, apr_off_t excess_size, ft_hash_t *hash_out, apr_pool_t *gc_pool)
Calculates the XXH128 checksum of a file.
Interface for file comparison and checksum calculation.
System-related utility functions.
napr_hash_index_t * napr_hash_first(apr_pool_t *pool, napr_hash_t *hash)
Start iterating over the entries in a hash table.
void napr_hash_this(napr_hash_index_t *hi, const void **key, apr_size_t *klen, void **val)
Get the current entry's details from the iteration state.
void * napr_hash_search(napr_hash_t *hash, const void *key, apr_size_t key_len, apr_uint32_t *hash_value)
Searches the hash table for an item.
napr_hash_index_t * napr_hash_next(napr_hash_index_t *index)
Continue iterating over the entries in a hash table.
void napr_hash_remove(napr_hash_t *hash, void *data, apr_uint32_t hash_value)
Removes an item from the hash table.
A high-performance hash table implementation built on APR.
void * napr_heap_extract(napr_heap_t *heap)
Removes and returns the element at the top of the heap (the min or max element).
napr_heap_t * napr_heap_make(apr_pool_t *pool, napr_heap_cmp_callback_fn_t *cmp)
Creates a new heap.
int napr_heap_insert(napr_heap_t *heap, void *datum)
Inserts an element into the heap, maintaining the heap property.
struct napr_heap_t napr_heap_t
Opaque heap structure.
apr_status_t napr_threadpool_shutdown(napr_threadpool_t *threadpool)
Shuts down the thread pool.
apr_status_t napr_threadpool_wait(napr_threadpool_t *threadpool)
Waits until all tasks currently in the queue have been processed.
apr_status_t napr_threadpool_init(napr_threadpool_t **threadpool, void *ctx, unsigned long nb_thread, threadpool_process_data_callback_fn_t *process_data, apr_pool_t *pool)
Initializes a thread pool.
apr_status_t napr_threadpool_add(napr_threadpool_t *threadpool, void *data)
Adds a task (a data item) to the thread pool's processing queue.
A simple fixed-size thread pool for concurrent task processing.
struct napr_threadpool_t napr_threadpool_t
Opaque thread pool structure.
The return value from 128-bit hashes.
Main configuration structure for the ftwin application.
Shared context for parallel hashing operations.
Task structure passed to worker threads for hashing individual files.