ftwin 0.8.10
ft_process.c
1#include <apr_thread_mutex.h>
2
3#include "debug.h"
4#include "ft_config.h"
5#include "ft_process.h"
6#include "ft_system.h"
7#include "ft_types.h"
8#include "napr_threadpool.h"
9#include "napr_hash.h"
10#include <apr_file_io.h>
11#include "ft_file.h"
12
13int ft_file_cmp(const void *param1, const void *param2);
14
15#include <archive.h>
16#include "ft_archive.h"
17
18static apr_status_t hashing_worker_callback(void *ctx, void *data)
19{
20 char errbuf[ERROR_BUFFER_SIZE];
21 hashing_context_t *h_ctx = (hashing_context_t *) ctx;
22 hashing_task_t *task = (hashing_task_t *) data;
23 ft_fsize_t *fsize = task->fsize;
24 ft_file_t *file = fsize->chksum_array[task->index].file;
25 apr_pool_t *subpool = NULL;
26 apr_status_t status = APR_SUCCESS;
27 char *filepath = NULL;
28
29 memset(errbuf, 0, sizeof(errbuf));
30 status = apr_pool_create(&subpool, h_ctx->pool);
31 if (APR_SUCCESS != status) {
32 DEBUG_ERR("error calling apr_pool_create: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
33 return status;
34 }
35
36 if (is_option_set(h_ctx->conf->mask, OPTION_UNTAR) && (NULL != file->subpath)) {
37 filepath = ft_archive_untar_file(file, subpool);
38 if (NULL == filepath) {
39 DEBUG_ERR("error calling ft_archive_untar_file");
40 apr_pool_destroy(subpool);
41 return APR_EGENERAL;
42 }
43 }
44 else {
45 filepath = file->path;
46 }
47
48 status = checksum_file(filepath, file->size, h_ctx->conf->excess_size,
49 &fsize->chksum_array[task->index].hash_value, subpool);
50
51 if (is_option_set(h_ctx->conf->mask, OPTION_UNTAR) && (NULL != file->subpath)) {
52 (void) apr_file_remove(filepath, subpool);
53 }
54
55 if (APR_SUCCESS == status) {
56 apr_status_t lock_status = APR_SUCCESS;
57
58 lock_status = apr_thread_mutex_lock(h_ctx->stats_mutex);
59 if (APR_SUCCESS == lock_status) {
60 h_ctx->files_processed++;
61
62 if (is_option_set(h_ctx->conf->mask, OPTION_VERBO)) {
63 (void) fprintf(stderr, "\rProgress [%" APR_SIZE_T_FMT "/%" APR_SIZE_T_FMT "] %d%% ",
64 h_ctx->files_processed, h_ctx->total_files,
65 (int) ((float) h_ctx->files_processed / (float) h_ctx->total_files * 100.0f));
66 }
67
68 apr_thread_mutex_unlock(h_ctx->stats_mutex);
69 }
70 }
71 else {
72 if (is_option_set(h_ctx->conf->mask, OPTION_VERBO)) {
73 (void) fprintf(stderr, "\nskipping %s because: %s\n", file->path,
74 apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
75 }
76 }
77
78 apr_pool_destroy(subpool);
79
80 return status;
81}
82
83/* Forward declarations for helper functions */
84static apr_status_t categorize_files(ft_conf_t *conf, napr_heap_t *tmp_heap, apr_size_t *total_hash_tasks);
85static apr_status_t dispatch_hashing_tasks(ft_conf_t *conf, apr_pool_t *gc_pool, apr_size_t total_hash_tasks);
86static apr_status_t collect_hashing_results(ft_conf_t *conf, napr_heap_t *tmp_heap, apr_pool_t *gc_pool);
87
88apr_status_t ft_process_files(ft_conf_t *conf)
89{
90 napr_heap_t *tmp_heap = NULL;
91 apr_pool_t *gc_pool = NULL;
92 apr_status_t status = APR_SUCCESS;
93 apr_size_t total_hash_tasks = 0;
94
95 if (is_option_set(conf->mask, OPTION_VERBO)) {
96 (void) fprintf(stderr, "Referencing files and sizes:\n");
97 }
98
99 status = apr_pool_create(&gc_pool, conf->pool);
100 if (APR_SUCCESS != status) {
101 char errbuf[ERROR_BUFFER_SIZE];
102 DEBUG_ERR("error calling apr_pool_create: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
103 return status;
104 }
105
106 tmp_heap = napr_heap_make(conf->pool, ft_file_cmp);
107
108 status = categorize_files(conf, tmp_heap, &total_hash_tasks);
109 if (status != APR_SUCCESS) {
110 apr_pool_destroy(gc_pool);
111 return status;
112 }
113
114 if (total_hash_tasks > 0) {
115 status = dispatch_hashing_tasks(conf, gc_pool, total_hash_tasks);
116 if (status != APR_SUCCESS) {
117 apr_pool_destroy(gc_pool);
118 return status;
119 }
120
121 status = collect_hashing_results(conf, tmp_heap, gc_pool);
122 if (status != APR_SUCCESS) {
123 apr_pool_destroy(gc_pool);
124 return status;
125 }
126 }
127
128 apr_pool_destroy(gc_pool);
129 conf->heap = tmp_heap;
130
131 return APR_SUCCESS;
132}
133
134static apr_status_t categorize_files(ft_conf_t *conf, napr_heap_t *tmp_heap, apr_size_t *total_hash_tasks)
135{
136 ft_file_t *file;
137 ft_fsize_t *fsize;
138 apr_uint32_t hash_value;
139
140 while (NULL != (file = napr_heap_extract(conf->heap))) {
141 fsize = napr_hash_search(conf->sizes, &file->size, sizeof(apr_off_t), &hash_value);
142 if (NULL != fsize) {
143 if (1 == fsize->nb_files) {
144 napr_hash_remove(conf->sizes, fsize, hash_value);
145 }
146 else {
147 if (NULL == fsize->chksum_array)
148 fsize->chksum_array = apr_palloc(conf->pool, fsize->nb_files * sizeof(struct ft_chksum_t));
149
150 fsize->chksum_array[fsize->nb_checksumed].file = file;
151
152 if (((2 == fsize->nb_files) || (0 == fsize->val)) && !is_option_set(conf->mask, OPTION_JSON)) {
153 memset(&fsize->chksum_array[fsize->nb_checksumed].hash_value, 0, sizeof(ft_hash_t));
154 fsize->nb_checksumed++;
155 napr_heap_insert(tmp_heap, file);
156 }
157 else {
158 (*total_hash_tasks)++;
159 fsize->nb_checksumed++;
160 }
161 }
162 }
163 else {
164 DEBUG_ERR("inconsistency error found, no size[%" APR_OFF_T_FMT "] in hash for file %s", file->size, file->path);
165 return APR_EGENERAL;
166 }
167 }
168 return APR_SUCCESS;
169}
170
171static apr_status_t dispatch_hashing_tasks(ft_conf_t *conf, apr_pool_t *gc_pool, apr_size_t total_hash_tasks)
172{
173 char errbuf[ERROR_BUFFER_SIZE];
174 ft_fsize_t *fsize;
175 napr_threadpool_t *threadpool = NULL;
176 hashing_context_t h_ctx;
177 apr_status_t status;
178
179 h_ctx.conf = conf;
180 h_ctx.pool = gc_pool;
181 h_ctx.files_processed = 0;
182 h_ctx.total_files = total_hash_tasks;
183
184 status = apr_thread_mutex_create(&h_ctx.stats_mutex, APR_THREAD_MUTEX_DEFAULT, gc_pool);
185 if (APR_SUCCESS != status) {
186 DEBUG_ERR("error calling apr_thread_mutex_create: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
187 return status;
188 }
189
190 status = napr_threadpool_init(&threadpool, &h_ctx, conf->num_threads, hashing_worker_callback, gc_pool);
191 if (APR_SUCCESS != status) {
192 DEBUG_ERR("error calling napr_threadpool_init: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
193 apr_thread_mutex_destroy(h_ctx.stats_mutex);
194 return status;
195 }
196
197 for (napr_hash_index_t *hash_index = napr_hash_first(gc_pool, conf->sizes); hash_index;
198 hash_index = napr_hash_next(hash_index)) {
199 napr_hash_this(hash_index, NULL, NULL, (void **) &fsize);
200
201 int should_hash = (fsize->nb_files > 2) && (0 != fsize->val);
202 if (is_option_set(conf->mask, OPTION_JSON)) {
203 should_hash = (fsize->nb_files >= 2);
204 }
205
206 if (should_hash) {
207 for (apr_uint32_t idx = 0; idx < fsize->nb_files; idx++) {
208 if (NULL != fsize->chksum_array[idx].file) {
209 hashing_task_t *task = apr_palloc(gc_pool, sizeof(hashing_task_t));
210 task->fsize = fsize;
211 task->index = idx;
212
213 status = napr_threadpool_add(threadpool, task);
214 if (APR_SUCCESS != status) {
215 DEBUG_ERR("error calling napr_threadpool_add: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
216 napr_threadpool_wait(threadpool);
217 apr_thread_mutex_destroy(h_ctx.stats_mutex);
218 return status;
219 }
220 }
221 }
222 }
223 }
224
225 napr_threadpool_wait(threadpool);
226
227 status = apr_thread_mutex_destroy(h_ctx.stats_mutex);
228 if (APR_SUCCESS != status) {
229 DEBUG_ERR("error calling apr_thread_mutex_destroy: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
230 }
231
232 if (is_option_set(conf->mask, OPTION_VERBO)) {
233 (void) fprintf(stderr, "\n");
234 }
235
236 status = napr_threadpool_shutdown(threadpool);
237 if (APR_SUCCESS != status) {
238 DEBUG_ERR("error calling napr_threadpool_shutdown: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
239 }
240
241 return APR_SUCCESS;
242}
243
244static apr_status_t collect_hashing_results(ft_conf_t *conf, napr_heap_t *tmp_heap, apr_pool_t *gc_pool)
245{
246 ft_fsize_t *fsize;
247
248 for (napr_hash_index_t *hash_index = napr_hash_first(gc_pool, conf->sizes); hash_index;
249 hash_index = napr_hash_next(hash_index)) {
250 napr_hash_this(hash_index, NULL, NULL, (void **) &fsize);
251
252 int should_insert = (fsize->nb_files > 2) && (0 != fsize->val);
253 if (is_option_set(conf->mask, OPTION_JSON)) {
254 should_insert = (fsize->nb_files >= 2);
255 }
256
257 if (should_insert) {
258 for (apr_uint32_t idx = 0; idx < fsize->nb_files; idx++) {
259 if (NULL != fsize->chksum_array[idx].file) {
260 napr_heap_insert(tmp_heap, fsize->chksum_array[idx].file);
261 }
262 }
263 }
264 }
265
266 return APR_SUCCESS;
267}
UTIL debug output macros.
#define DEBUG_ERR(str, arg...)
Display error message at the level error.
Definition debug.h:31
apr_status_t checksum_file(const char *filename, apr_off_t size, apr_off_t excess_size, ft_hash_t *hash_out, apr_pool_t *gc_pool)
Calculates the XXH128 checksum of a file.
Definition ft_file.c:134
Interface for file comparison and checksum calculation.
System-related utility functions.
napr_hash_index_t * napr_hash_first(apr_pool_t *pool, napr_hash_t *hash)
Start iterating over the entries in a hash table.
Definition napr_hash.c:280
void napr_hash_this(napr_hash_index_t *hi, const void **key, apr_size_t *klen, void **val)
Get the current entry's details from the iteration state.
Definition napr_hash.c:314
void * napr_hash_search(napr_hash_t *hash, const void *key, apr_size_t key_len, apr_uint32_t *hash_value)
Searches the hash table for an item.
Definition napr_hash.c:145
napr_hash_index_t * napr_hash_next(napr_hash_index_t *index)
Continue iterating over the entries in a hash table.
Definition napr_hash.c:294
void napr_hash_remove(napr_hash_t *hash, void *data, apr_uint32_t hash_value)
Removes an item from the hash table.
Definition napr_hash.c:210
A high-performance hash table implementation built on APR.
void * napr_heap_extract(napr_heap_t *heap)
Removes and returns the element at the top of the heap (the min or max element).
Definition napr_heap.c:122
napr_heap_t * napr_heap_make(apr_pool_t *pool, napr_heap_cmp_callback_fn_t *cmp)
Creates a new heap.
Definition napr_heap.c:48
int napr_heap_insert(napr_heap_t *heap, void *datum)
Inserts an element into the heap, maintaining the heap property.
Definition napr_heap.c:70
struct napr_heap_t napr_heap_t
Opaque heap structure.
Definition napr_heap.h:28
apr_status_t napr_threadpool_shutdown(napr_threadpool_t *threadpool)
Shuts down the thread pool.
apr_status_t napr_threadpool_wait(napr_threadpool_t *threadpool)
Waits until all tasks currently in the queue have been processed.
apr_status_t napr_threadpool_init(napr_threadpool_t **threadpool, void *ctx, unsigned long nb_thread, threadpool_process_data_callback_fn_t *process_data, apr_pool_t *pool)
Initializes a thread pool.
apr_status_t napr_threadpool_add(napr_threadpool_t *threadpool, void *data)
Adds a task (a data item) to the thread pool's processing queue.
A simple fixed-size thread pool for concurrent task processing.
struct napr_threadpool_t napr_threadpool_t
Opaque thread pool structure.
The return value from 128-bit hashes.
Definition xxhash.h:1239
Main configuration structure for the ftwin application.
Definition ft_config.h:94
Shared context for parallel hashing operations.
Definition ft_types.h:115
Task structure passed to worker threads for hashing individual files.
Definition ft_types.h:105