Line data Source code
1 : #include <apr_thread_mutex.h>
2 :
3 : #include "debug.h"
4 : #include "ft_config.h"
5 : #include "ft_process.h"
6 : #include "ft_system.h"
7 : #include "ft_types.h"
8 : #include "napr_threadpool.h"
9 : #include "napr_hash.h"
10 : #include <apr_file_io.h>
11 : #include "ft_file.h"
12 :
13 : int ft_file_cmp(const void *param1, const void *param2);
14 :
15 : #include <archive.h>
16 : #include "ft_archive.h"
17 :
18 93 : static apr_status_t hashing_worker_callback(void *ctx, void *data)
19 93 : {
20 93 : char errbuf[ERROR_BUFFER_SIZE];
21 93 : hashing_context_t *h_ctx = (hashing_context_t *) ctx;
22 93 : hashing_task_t *task = (hashing_task_t *) data;
23 93 : ft_fsize_t *fsize = task->fsize;
24 93 : ft_file_t *file = fsize->chksum_array[task->index].file;
25 93 : apr_pool_t *subpool = NULL;
26 93 : apr_status_t status = APR_SUCCESS;
27 93 : char *filepath = NULL;
28 :
29 93 : memset(errbuf, 0, sizeof(errbuf));
30 93 : status = apr_pool_create(&subpool, h_ctx->pool);
31 93 : if (APR_SUCCESS != status) {
32 0 : DEBUG_ERR("error calling apr_pool_create: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
33 0 : return status;
34 : }
35 :
36 93 : if (is_option_set(h_ctx->conf->mask, OPTION_UNTAR) && (NULL != file->subpath)) {
37 0 : filepath = ft_archive_untar_file(file, subpool);
38 0 : if (NULL == filepath) {
39 0 : DEBUG_ERR("error calling ft_archive_untar_file");
40 0 : apr_pool_destroy(subpool);
41 0 : return APR_EGENERAL;
42 : }
43 : }
44 : else {
45 93 : filepath = file->path;
46 : }
47 :
48 93 : status = checksum_file(filepath, file->size, h_ctx->conf->excess_size,
49 93 : &fsize->chksum_array[task->index].hash_value, subpool);
50 :
51 93 : if (is_option_set(h_ctx->conf->mask, OPTION_UNTAR) && (NULL != file->subpath)) {
52 0 : (void) apr_file_remove(filepath, subpool);
53 : }
54 :
55 93 : if (APR_SUCCESS == status) {
56 93 : apr_status_t lock_status = APR_SUCCESS;
57 :
58 93 : lock_status = apr_thread_mutex_lock(h_ctx->stats_mutex);
59 93 : if (APR_SUCCESS == lock_status) {
60 93 : h_ctx->files_processed++;
61 :
62 93 : if (is_option_set(h_ctx->conf->mask, OPTION_VERBO)) {
63 0 : (void) fprintf(stderr, "\rProgress [%" APR_SIZE_T_FMT "/%" APR_SIZE_T_FMT "] %d%% ",
64 : h_ctx->files_processed, h_ctx->total_files,
65 0 : (int) ((float) h_ctx->files_processed / (float) h_ctx->total_files * 100.0f));
66 : }
67 :
68 93 : apr_thread_mutex_unlock(h_ctx->stats_mutex);
69 : }
70 : }
71 : else {
72 0 : if (is_option_set(h_ctx->conf->mask, OPTION_VERBO)) {
73 0 : (void) fprintf(stderr, "\nskipping %s because: %s\n", file->path,
74 : apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
75 : }
76 : }
77 :
78 93 : apr_pool_destroy(subpool);
79 :
80 93 : return status;
81 : }
82 :
83 : /* Forward declarations for helper functions */
84 : static apr_status_t categorize_files(ft_conf_t *conf, napr_heap_t *tmp_heap, apr_size_t *total_hash_tasks);
85 : static apr_status_t dispatch_hashing_tasks(ft_conf_t *conf, apr_pool_t *gc_pool, apr_size_t total_hash_tasks);
86 : static apr_status_t collect_hashing_results(ft_conf_t *conf, napr_heap_t *tmp_heap, apr_pool_t *gc_pool);
87 :
88 17 : apr_status_t ft_process_files(ft_conf_t *conf)
89 : {
90 17 : napr_heap_t *tmp_heap = NULL;
91 17 : apr_pool_t *gc_pool = NULL;
92 17 : apr_status_t status = APR_SUCCESS;
93 17 : apr_size_t total_hash_tasks = 0;
94 :
95 17 : if (is_option_set(conf->mask, OPTION_VERBO)) {
96 0 : (void) fprintf(stderr, "Referencing files and sizes:\n");
97 : }
98 :
99 17 : status = apr_pool_create(&gc_pool, conf->pool);
100 17 : if (APR_SUCCESS != status) {
101 0 : char errbuf[ERROR_BUFFER_SIZE];
102 0 : DEBUG_ERR("error calling apr_pool_create: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
103 0 : return status;
104 : }
105 :
106 17 : tmp_heap = napr_heap_make(conf->pool, ft_file_cmp);
107 :
108 17 : status = categorize_files(conf, tmp_heap, &total_hash_tasks);
109 17 : if (status != APR_SUCCESS) {
110 0 : apr_pool_destroy(gc_pool);
111 0 : return status;
112 : }
113 :
114 17 : if (total_hash_tasks > 0) {
115 11 : status = dispatch_hashing_tasks(conf, gc_pool, total_hash_tasks);
116 11 : if (status != APR_SUCCESS) {
117 0 : apr_pool_destroy(gc_pool);
118 0 : return status;
119 : }
120 :
121 11 : status = collect_hashing_results(conf, tmp_heap, gc_pool);
122 11 : if (status != APR_SUCCESS) {
123 0 : apr_pool_destroy(gc_pool);
124 0 : return status;
125 : }
126 : }
127 :
128 17 : apr_pool_destroy(gc_pool);
129 17 : conf->heap = tmp_heap;
130 :
131 17 : return APR_SUCCESS;
132 : }
133 :
134 17 : static apr_status_t categorize_files(ft_conf_t *conf, napr_heap_t *tmp_heap, apr_size_t *total_hash_tasks)
135 : {
136 : ft_file_t *file;
137 : ft_fsize_t *fsize;
138 : apr_uint32_t hash_value;
139 :
140 127 : while (NULL != (file = napr_heap_extract(conf->heap))) {
141 110 : fsize = napr_hash_search(conf->sizes, &file->size, sizeof(apr_off_t), &hash_value);
142 110 : if (NULL != fsize) {
143 110 : if (1 == fsize->nb_files) {
144 4 : napr_hash_remove(conf->sizes, fsize, hash_value);
145 : }
146 : else {
147 106 : if (NULL == fsize->chksum_array)
148 36 : fsize->chksum_array = apr_palloc(conf->pool, fsize->nb_files * sizeof(struct ft_chksum_t));
149 :
150 106 : fsize->chksum_array[fsize->nb_checksumed].file = file;
151 :
152 106 : if (((2 == fsize->nb_files) || (0 == fsize->val)) && !is_option_set(conf->mask, OPTION_JSON)) {
153 13 : memset(&fsize->chksum_array[fsize->nb_checksumed].hash_value, 0, sizeof(ft_hash_t));
154 13 : fsize->nb_checksumed++;
155 13 : napr_heap_insert(tmp_heap, file);
156 : }
157 : else {
158 93 : (*total_hash_tasks)++;
159 93 : fsize->nb_checksumed++;
160 : }
161 : }
162 : }
163 : else {
164 0 : DEBUG_ERR("inconsistency error found, no size[%" APR_OFF_T_FMT "] in hash for file %s", file->size, file->path);
165 0 : return APR_EGENERAL;
166 : }
167 : }
168 17 : return APR_SUCCESS;
169 : }
170 :
171 11 : static apr_status_t dispatch_hashing_tasks(ft_conf_t *conf, apr_pool_t *gc_pool, apr_size_t total_hash_tasks)
172 11 : {
173 11 : char errbuf[ERROR_BUFFER_SIZE];
174 : ft_fsize_t *fsize;
175 11 : napr_threadpool_t *threadpool = NULL;
176 : hashing_context_t h_ctx;
177 : apr_status_t status;
178 :
179 11 : h_ctx.conf = conf;
180 11 : h_ctx.pool = gc_pool;
181 11 : h_ctx.files_processed = 0;
182 11 : h_ctx.total_files = total_hash_tasks;
183 :
184 11 : status = apr_thread_mutex_create(&h_ctx.stats_mutex, APR_THREAD_MUTEX_DEFAULT, gc_pool);
185 11 : if (APR_SUCCESS != status) {
186 0 : DEBUG_ERR("error calling apr_thread_mutex_create: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
187 0 : return status;
188 : }
189 :
190 11 : status = napr_threadpool_init(&threadpool, &h_ctx, conf->num_threads, hashing_worker_callback, gc_pool);
191 11 : if (APR_SUCCESS != status) {
192 0 : DEBUG_ERR("error calling napr_threadpool_init: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
193 0 : apr_thread_mutex_destroy(h_ctx.stats_mutex);
194 0 : return status;
195 : }
196 :
197 41 : for (napr_hash_index_t *hash_index = napr_hash_first(gc_pool, conf->sizes); hash_index;
198 30 : hash_index = napr_hash_next(hash_index)) {
199 30 : napr_hash_this(hash_index, NULL, NULL, (void **) &fsize);
200 :
201 30 : int should_hash = (fsize->nb_files > 2) && (0 != fsize->val);
202 30 : if (is_option_set(conf->mask, OPTION_JSON)) {
203 1 : should_hash = (fsize->nb_files >= 2);
204 : }
205 :
206 30 : if (should_hash) {
207 123 : for (apr_uint32_t idx = 0; idx < fsize->nb_files; idx++) {
208 93 : if (NULL != fsize->chksum_array[idx].file) {
209 93 : hashing_task_t *task = apr_palloc(gc_pool, sizeof(hashing_task_t));
210 93 : task->fsize = fsize;
211 93 : task->index = idx;
212 :
213 93 : status = napr_threadpool_add(threadpool, task);
214 93 : if (APR_SUCCESS != status) {
215 0 : DEBUG_ERR("error calling napr_threadpool_add: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
216 0 : napr_threadpool_wait(threadpool);
217 0 : apr_thread_mutex_destroy(h_ctx.stats_mutex);
218 0 : return status;
219 : }
220 : }
221 : }
222 : }
223 : }
224 :
225 11 : napr_threadpool_wait(threadpool);
226 :
227 11 : status = apr_thread_mutex_destroy(h_ctx.stats_mutex);
228 11 : if (APR_SUCCESS != status) {
229 0 : DEBUG_ERR("error calling apr_thread_mutex_destroy: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
230 : }
231 :
232 11 : if (is_option_set(conf->mask, OPTION_VERBO)) {
233 0 : (void) fprintf(stderr, "\n");
234 : }
235 :
236 11 : status = napr_threadpool_shutdown(threadpool);
237 11 : if (APR_SUCCESS != status) {
238 0 : DEBUG_ERR("error calling napr_threadpool_shutdown: %s", apr_strerror(status, errbuf, ERROR_BUFFER_SIZE));
239 : }
240 :
241 11 : return APR_SUCCESS;
242 : }
243 :
244 11 : static apr_status_t collect_hashing_results(ft_conf_t *conf, napr_heap_t *tmp_heap, apr_pool_t *gc_pool)
245 : {
246 : ft_fsize_t *fsize;
247 :
248 41 : for (napr_hash_index_t *hash_index = napr_hash_first(gc_pool, conf->sizes); hash_index;
249 30 : hash_index = napr_hash_next(hash_index)) {
250 30 : napr_hash_this(hash_index, NULL, NULL, (void **) &fsize);
251 :
252 30 : int should_insert = (fsize->nb_files > 2) && (0 != fsize->val);
253 30 : if (is_option_set(conf->mask, OPTION_JSON)) {
254 1 : should_insert = (fsize->nb_files >= 2);
255 : }
256 :
257 30 : if (should_insert) {
258 123 : for (apr_uint32_t idx = 0; idx < fsize->nb_files; idx++) {
259 93 : if (NULL != fsize->chksum_array[idx].file) {
260 93 : napr_heap_insert(tmp_heap, fsize->chksum_array[idx].file);
261 : }
262 : }
263 : }
264 : }
265 :
266 11 : return APR_SUCCESS;
267 : }
|