import-merge.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716
  1. /** BEGIN COPYRIGHT BLOCK
  2. * This Program is free software; you can redistribute it and/or modify it under
  3. * the terms of the GNU General Public License as published by the Free Software
  4. * Foundation; version 2 of the License.
  5. *
  6. * This Program is distributed in the hope that it will be useful, but WITHOUT
  7. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  8. * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  9. *
  10. * You should have received a copy of the GNU General Public License along with
  11. * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
  12. * Place, Suite 330, Boston, MA 02111-1307 USA.
  13. *
  14. * In addition, as a special exception, Red Hat, Inc. gives You the additional
  15. * right to link the code of this Program with code not covered under the GNU
  16. * General Public License ("Non-GPL Code") and to distribute linked combinations
  17. * including the two, subject to the limitations in this paragraph. Non-GPL Code
  18. * permitted under this exception must only link to the code of this Program
  19. * through those well defined interfaces identified in the file named EXCEPTION
  20. * found in the source code files (the "Approved Interfaces"). The files of
  21. * Non-GPL Code may instantiate templates or use macros or inline functions from
  22. * the Approved Interfaces without causing the resulting work to be covered by
  23. * the GNU General Public License. Only Red Hat, Inc. may make changes or
  24. * additions to the list of Approved Interfaces. You must obey the GNU General
  25. * Public License in all respects for all of the Program code and other code used
  26. * in conjunction with the Program except the Non-GPL Code covered by this
  27. * exception. If you modify this file, you may extend this exception to your
  28. * version of the file, but you are not obligated to do so. If you do not wish to
  29. * provide this exception without modification, you must delete this exception
  30. * statement from your version and license this file solely under the GPL without
  31. * exception.
  32. *
  33. *
  34. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  35. * Copyright (C) 2005 Red Hat, Inc.
  36. * All rights reserved.
  37. * END COPYRIGHT BLOCK **/
  38. #ifdef HAVE_CONFIG_H
  39. # include <config.h>
  40. #endif
  41. /*
  42. * this is a bunch of routines for merging groups of db files together --
  43. * currently it's only used for imports (when we import into several small
  44. * db sets for speed, then merge them).
  45. */
  46. #include "back-ldbm.h"
  47. #include "import.h"
  48. struct _import_merge_thang
  49. {
  50. int type;
  51. #define IMPORT_MERGE_THANG_IDL 1 /* Values for type */
  52. #define IMPORT_MERGE_THANG_VLV 2
  53. union {
  54. IDList *idl; /* if type == IMPORT_MERGE_THANG_IDL */
  55. DBT vlv_data; /* if type == IMPORT_MERGE_THANG_VLV */
  56. } payload;
  57. };
  58. typedef struct _import_merge_thang import_merge_thang;
  59. struct _import_merge_queue_entry
  60. {
  61. int *file_referenced_list;
  62. import_merge_thang thang;
  63. DBT key;
  64. struct _import_merge_queue_entry *next;
  65. };
  66. typedef struct _import_merge_queue_entry import_merge_queue_entry;
  67. static int import_merge_get_next_thang(backend *be, DBC *cursor, DB *db, import_merge_thang *thang, DBT *key, int type)
  68. {
  69. int ret = 0;
  70. DBT value = {0};
  71. value.flags = DB_DBT_MALLOC;
  72. key->flags = DB_DBT_MALLOC;
  73. thang->type = type;
  74. if (IMPORT_MERGE_THANG_IDL == type) {
  75. /* IDL case */
  76. around:
  77. ret = cursor->c_get(cursor, key, &value, DB_NEXT_NODUP);
  78. if (0 == ret) {
  79. /* Check that we've not reached the beginning of continuation
  80. * blocks */
  81. if (CONT_PREFIX != ((char*)key->data)[0]) {
  82. /* If not, read the IDL using idl_fetch() */
  83. key->flags = DB_DBT_REALLOC;
  84. ret = NEW_IDL_NO_ALLID;
  85. thang->payload.idl = idl_fetch(be, db, key, NULL, NULL, &ret);
  86. PR_ASSERT(NULL != thang->payload.idl);
  87. } else {
  88. slapi_ch_free(&(value.data));
  89. slapi_ch_free(&(key->data));
  90. key->flags = DB_DBT_MALLOC;
  91. goto around; /* Just skip these */
  92. }
  93. slapi_ch_free(&(value.data));
  94. } else {
  95. if (DB_NOTFOUND == ret) {
  96. /* This means that we're at the end of the file */
  97. ret = EOF;
  98. }
  99. }
  100. } else {
  101. /* VLV case */
  102. ret = cursor->c_get(cursor,key,&value,DB_NEXT);
  103. if (0 == ret) {
  104. thang->payload.vlv_data = value;
  105. thang->payload.vlv_data.flags = 0;
  106. key->flags = 0;
  107. } else {
  108. if (DB_NOTFOUND == ret) {
  109. /* This means that we're at the end of the file */
  110. ret = EOF;
  111. }
  112. }
  113. }
  114. return ret;
  115. }
  116. static import_merge_queue_entry *import_merge_make_new_queue_entry(import_merge_thang *thang, DBT *key, int fileno, int passes)
  117. {
  118. /* Make a new entry */
  119. import_merge_queue_entry *new_entry = (import_merge_queue_entry *)slapi_ch_calloc(1, sizeof(import_merge_queue_entry));
  120. if (NULL == new_entry) {
  121. return NULL;
  122. }
  123. new_entry->key = *key;
  124. new_entry->thang = *thang;
  125. new_entry->file_referenced_list =
  126. (int *)slapi_ch_calloc(passes, sizeof(fileno));
  127. if (NULL == new_entry->file_referenced_list) {
  128. return NULL;
  129. }
  130. (new_entry->file_referenced_list)[fileno] = 1;
  131. return new_entry;
  132. }
  133. /* Put an IDL onto the priority queue */
  134. static int import_merge_insert_input_queue(backend *be, import_merge_queue_entry **queue,int fileno, DBT *key, import_merge_thang *thang,int passes)
  135. {
  136. /* Walk the list, looking for a key value which is greater than or equal
  137. * to the presented key */
  138. /* If an equal key is found, compute the union of the IDLs and store that
  139. * back in the queue entry */
  140. /* If a key greater than is found, or no key greater than is found, insert
  141. * a new queue entry */
  142. import_merge_queue_entry *current_entry = NULL;
  143. import_merge_queue_entry *previous_entry = NULL;
  144. PR_ASSERT(NULL != thang);
  145. if (NULL == *queue) {
  146. /* Queue was empty--- put ourselves at the head */
  147. *queue = import_merge_make_new_queue_entry(thang,key,fileno,passes);
  148. if (NULL == *queue) {
  149. return -1;
  150. }
  151. } else {
  152. for (current_entry = *queue; current_entry != NULL;
  153. current_entry = current_entry->next) {
  154. int cmp = strcmp(key->data,current_entry->key.data);
  155. if (0 == cmp) {
  156. if (IMPORT_MERGE_THANG_IDL == thang->type) { /* IDL case */
  157. IDList *idl = thang->payload.idl;
  158. /* Equal --- merge into the stored IDL, add file ID
  159. * to the list */
  160. IDList *new_idl =
  161. idl_union(be, current_entry->thang.payload.idl, idl);
  162. idl_free(current_entry->thang.payload.idl);
  163. idl_free(idl);
  164. current_entry->thang.payload.idl = new_idl;
  165. /* Add this file id into the entry's referenced list */
  166. (current_entry->file_referenced_list)[fileno] = 1;
  167. /* Because we merged the entries, we no longer need the
  168. * key, so free it */
  169. slapi_ch_free(&(key->data));
  170. goto done;
  171. } else {
  172. /* VLV case, we can see exact keys, this is not a bug ! */
  173. /* We want to ensure that they key read most recently is
  174. * put later in the queue than any others though */
  175. }
  176. } else {
  177. if (cmp < 0) {
  178. /* We compare smaller than the stored key, so we should
  179. * insert ourselves before this entry */
  180. break;
  181. } else {
  182. /* We compare greater than this entry, so we should keep
  183. * going */ ;
  184. }
  185. }
  186. previous_entry = current_entry;
  187. }
  188. /* Now insert */
  189. {
  190. import_merge_queue_entry *new_entry =
  191. import_merge_make_new_queue_entry(thang, key, fileno, passes);
  192. if (NULL == new_entry) {
  193. return -1;
  194. }
  195. /* If not, then we must need to insert ourselves after the last
  196. * entry */
  197. new_entry->next = current_entry;
  198. if (NULL == previous_entry) {
  199. *queue = new_entry;
  200. } else {
  201. previous_entry->next = new_entry;
  202. }
  203. }
  204. }
  205. done:
  206. return 0;
  207. }
  208. static int import_merge_remove_input_queue(backend *be, import_merge_queue_entry **queue, import_merge_thang *thang,DBT *key,DBC **input_cursors, DB **input_files,int passes)
  209. {
  210. import_merge_queue_entry *head = NULL;
  211. int file_referenced = 0;
  212. int i = 0;
  213. int ret = 0;
  214. PR_ASSERT(NULL != queue);
  215. head = *queue;
  216. if (head == NULL) {
  217. /* Means we've exhausted the queue---we're done */
  218. return EOF;
  219. }
  220. /* Remove the head of the queue */
  221. *queue = head->next;
  222. /* Get the IDL */
  223. *thang = head->thang;
  224. *key = head->key;
  225. PR_ASSERT(NULL != thang);
  226. /* Walk the list of referenced files, reading in the next IDL from each
  227. * one to the queue */
  228. for (i = 0 ; i < passes; i++) {
  229. import_merge_thang new_thang = {0};
  230. DBT new_key = {0};
  231. file_referenced = (head->file_referenced_list)[i];
  232. if (file_referenced) {
  233. ret = import_merge_get_next_thang(be, input_cursors[i],
  234. input_files[i], &new_thang, &new_key, thang->type);
  235. if (0 != ret) {
  236. if (EOF == ret) {
  237. /* Means that we walked off the end of the list,
  238. * do nothing */
  239. ret = 0;
  240. } else {
  241. /* Some other error */
  242. break;
  243. }
  244. } else {
  245. /* This function is responsible for any freeing needed */
  246. import_merge_insert_input_queue(be, queue, i, &new_key,
  247. &new_thang, passes);
  248. }
  249. }
  250. }
  251. slapi_ch_free( (void**)&(head->file_referenced_list));
  252. slapi_ch_free( (void**)&head);
  253. return ret;
  254. }
  255. static int import_merge_open_input_cursors(DB**files, int passes, DBC ***cursors)
  256. {
  257. int i = 0;
  258. int ret = 0;
  259. *cursors = (DBC**)slapi_ch_calloc(passes,sizeof(DBC*));
  260. if (NULL == *cursors) {
  261. return -1;
  262. }
  263. for (i = 0; i < passes; i++) {
  264. DB *pDB = files[i];
  265. DBC *pDBC = NULL;
  266. if (NULL != pDB) {
  267. /* Try to open a cursor onto the file */
  268. ret = pDB->cursor(pDB,NULL,&pDBC,0);
  269. if (0 != ret) {
  270. break;
  271. } else {
  272. (*cursors)[i] = pDBC;
  273. }
  274. }
  275. }
  276. return ret;
  277. }
  278. static int import_count_merge_input_files(ldbm_instance *inst,
  279. char *indexname, int passes, int *number_found, int *pass_number)
  280. {
  281. int i = 0;
  282. int found_one = 0;
  283. *number_found = 0;
  284. *pass_number = 0;
  285. for (i = 0; i < passes; i++) {
  286. int fd;
  287. char *filename = slapi_ch_smprintf("%s/%s.%d%s", inst->inst_dir_name, indexname, i+1,
  288. LDBM_FILENAME_SUFFIX);
  289. if (NULL == filename) {
  290. return -1;
  291. }
  292. fd = dblayer_open_huge_file(filename, O_RDONLY, 0);
  293. slapi_ch_free( (void**)&filename);
  294. if (fd >= 0) {
  295. close(fd);
  296. if (found_one == 0) {
  297. *pass_number = i+1;
  298. }
  299. found_one = 1;
  300. (*number_found)++;
  301. } else {
  302. ; /* Not finding a file is OK */
  303. }
  304. }
  305. return 0;
  306. }
  307. static int import_open_merge_input_files(backend *be, IndexInfo *index_info,
  308. int passes, DB ***input_files, int *number_found, int *pass_number)
  309. {
  310. int i = 0;
  311. int ret = 0;
  312. int found_one = 0;
  313. *number_found = 0;
  314. *pass_number = 0;
  315. *input_files = (DB**)slapi_ch_calloc(passes,sizeof(DB*));
  316. if (NULL == *input_files) {
  317. /* Memory allocation error */
  318. return -1;
  319. }
  320. for (i = 0; i < passes; i++) {
  321. DB *pDB = NULL;
  322. char *filename = slapi_ch_smprintf("%s.%d", index_info->name, i+1);
  323. if (NULL == filename) {
  324. return -1;
  325. }
  326. if (vlv_isvlv(filename)) {
  327. /* not sure why the file would be marked as a vlv index but
  328. not the index configuration . . . but better make sure
  329. the new code works with the old semantics */
  330. int saved_mask = index_info->ai->ai_indexmask;
  331. index_info->ai->ai_indexmask |= INDEX_VLV;
  332. ret = dblayer_open_file(be, filename, 0, index_info->ai, &pDB);
  333. index_info->ai->ai_indexmask = saved_mask;
  334. } else {
  335. ret = dblayer_open_file(be, filename, 0, index_info->ai, &pDB);
  336. }
  337. slapi_ch_free( (void**)&filename);
  338. if (0 == ret) {
  339. if (found_one == 0) {
  340. *pass_number = i+1;
  341. }
  342. found_one = 1;
  343. (*number_found)++;
  344. (*input_files)[i] = pDB;
  345. } else {
  346. if (ENOENT == ret) {
  347. ret = 0; /* Not finding a file is OK */
  348. } else {
  349. break;
  350. }
  351. }
  352. }
  353. return ret;
  354. }
  355. /* Performs the n-way merge on one file */
  356. static int import_merge_one_file(ImportWorkerInfo *worker, int passes,
  357. int *key_count)
  358. {
  359. ldbm_instance *inst = worker->job->inst;
  360. backend *be = inst->inst_be;
  361. DB *output_file = NULL;
  362. int ret = 0;
  363. int preclose_ret = 0;
  364. int number_found = 0;
  365. int pass_number = 0;
  366. PR_ASSERT(NULL != inst);
  367. /* Try to open all the input files.
  368. If we can't open file a file, we assume that is
  369. because there was no data in it. */
  370. ret = import_count_merge_input_files(inst, worker->index_info->name,
  371. passes, &number_found, &pass_number);
  372. if (0 != ret) {
  373. goto error;
  374. }
  375. /* If there were no input files, then we're finished ! */
  376. if (0 == number_found) {
  377. ret = 0;
  378. goto error;
  379. }
  380. /* Special-case where there's only one input file---just rename it */
  381. if (1 == number_found) {
  382. char *newname = NULL;
  383. char *oldname = NULL;
  384. ret = import_make_merge_filenames(inst->inst_dir_name,
  385. worker->index_info->name, pass_number, &oldname, &newname);
  386. if (0 != ret) {
  387. import_log_notice(worker->job, "Failed making filename in merge");
  388. goto error;
  389. }
  390. ret = PR_Rename(newname,oldname);
  391. if (0 != ret) {
  392. PRErrorCode prerr = PR_GetError();
  393. import_log_notice(worker->job, "Failed to rename file \"%s\" to \"%s\" "
  394. "in merge, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)",
  395. oldname, newname, prerr, slapd_pr_strerror(prerr));
  396. slapi_ch_free( (void**)&newname);
  397. slapi_ch_free( (void**)&oldname);
  398. goto error;
  399. }
  400. slapi_ch_free( (void**)&newname);
  401. slapi_ch_free( (void**)&oldname);
  402. *key_count = -1;
  403. } else {
  404. /* We really need to merge */
  405. import_merge_queue_entry *merge_queue = NULL;
  406. DB **input_files = NULL;
  407. DBC **input_cursors = NULL;
  408. DBT key = {0};
  409. import_merge_thang thang = {0};
  410. int i = 0;
  411. int not_finished = 1;
  412. int vlv_index = (INDEX_VLV == worker->index_info->ai->ai_indexmask);
  413. #if 0
  414. /* Close and re-open regions, bugs otherwise */
  415. ret = dblayer_close(inst->inst_li, DBLAYER_IMPORT_MODE);
  416. if (0 != ret) {
  417. if (ENOSPC == ret) {
  418. import_log_notice(worker->job, "FAILED: NO DISK SPACE LEFT");
  419. } else {
  420. import_log_notice(worker->job, "MERGE FAIL 8 %d", ret);
  421. }
  422. return ret;
  423. }
  424. ret = dblayer_start(inst->inst_li, DBLAYER_IMPORT_MODE);
  425. if (0 != ret) {
  426. import_log_notice(worker->job, "MERGE FAIL 9");
  427. return ret;
  428. }
  429. ret = dblayer_instance_start(be, DBLAYER_IMPORT_MODE);
  430. if (0 != ret) {
  431. import_log_notice(worker->job, "MERGE FAIL 9A");
  432. return ret;
  433. }
  434. #else
  435. /* we have reason to believe that it's okay to leave the region files
  436. * open in db3.x, since they track which files are opened and closed.
  437. * if we had to close the region files, we'd have to take down the
  438. * whole backend and defeat the purpose of an online import ---
  439. * baaad medicine.
  440. */
  441. ret = dblayer_instance_close(be);
  442. if (0 != ret) {
  443. import_log_notice(worker->job, "MERGE FAIL 8i %d\n", ret);
  444. return ret;
  445. }
  446. ret = dblayer_instance_start(be, DBLAYER_IMPORT_MODE);
  447. if (0 != ret) {
  448. import_log_notice(worker->job, "MERGE FAIL 8j %d\n", ret);
  449. return ret;
  450. }
  451. #endif
  452. ret = import_open_merge_input_files(be, worker->index_info,
  453. passes, &input_files, &number_found, &pass_number);
  454. if (0 != ret) {
  455. import_log_notice(worker->job, "MERGE FAIL 10");
  456. return ret;
  457. }
  458. ret = dblayer_open_file(be, worker->index_info->name, 1,
  459. worker->index_info->ai, &output_file);
  460. if (0 != ret) {
  461. import_log_notice(worker->job, "Failed to open output file for "
  462. "index %s in merge", worker->index_info->name);
  463. goto error;
  464. }
  465. /* OK, so we now have input and output files open and can proceed to
  466. * merge */
  467. /* We want to pre-fill the input IDL queue */
  468. /* Open cursors onto the input files */
  469. ret = import_merge_open_input_cursors(input_files, passes,
  470. &input_cursors);
  471. if (0 != ret) {
  472. import_log_notice(worker->job, "MERGE FAIL 2 %s %d",
  473. worker->index_info->name, ret);
  474. goto error;
  475. }
  476. /* Now read from the first location in each file and insert into the
  477. * queue */
  478. for (i = 0; i < passes; i++) if (input_files[i]) {
  479. import_merge_thang prime_thang = {0};
  480. /* Read an IDL from the file */
  481. ret = import_merge_get_next_thang(be, input_cursors[i],
  482. input_files[i], &prime_thang, &key,
  483. vlv_index ? IMPORT_MERGE_THANG_VLV : IMPORT_MERGE_THANG_IDL);
  484. if (0 != ret) {
  485. import_log_notice(worker->job, "MERGE FAIL 1 %s %d",
  486. worker->index_info->name, ret);
  487. goto error;
  488. }
  489. /* Put it on the queue */
  490. ret = import_merge_insert_input_queue(be, &merge_queue, i,& key,
  491. &prime_thang, passes);
  492. if (0 != ret) {
  493. import_log_notice(worker->job, "MERGE FAIL 0 %s",
  494. worker->index_info->name);
  495. goto error;
  496. }
  497. }
  498. /* We now have a pre-filled queue, so we may now proceed to remove the
  499. head entry and write it to the output file, and repeat this process
  500. until we've finished reading all the input data */
  501. while (not_finished && (0 == ret) ) {
  502. ret = import_merge_remove_input_queue(be, &merge_queue, &thang,
  503. &key, input_cursors, input_files, passes);
  504. if (0 != ret) {
  505. /* Have we finished cleanly ? */
  506. if (EOF == ret) {
  507. not_finished = 0;
  508. } else {
  509. import_log_notice(worker->job, "MERGE FAIL 3 %s, %d",
  510. worker->index_info->name, ret);
  511. }
  512. } else {
  513. /* Write it out */
  514. (*key_count)++;
  515. if (vlv_index) {
  516. /* Write the vlv index */
  517. ret = output_file->put(output_file, NULL, &key,
  518. &(thang.payload.vlv_data),0);
  519. slapi_ch_free(&(thang.payload.vlv_data.data));
  520. thang.payload.vlv_data.data = NULL;
  521. } else {
  522. /* Write the IDL index */
  523. ret = idl_store_block(be, output_file, &key,
  524. thang.payload.idl, NULL, worker->index_info->ai);
  525. /* Free the key we got back from the queue */
  526. idl_free(thang.payload.idl);
  527. thang.payload.idl = NULL;
  528. }
  529. slapi_ch_free(&(key.data));
  530. key.data = NULL;
  531. if (0 != ret) {
  532. /* Failed to write--- most obvious cause being out of
  533. disk space, let's make sure that we at least print a
  534. sensible error message right here. The caller should
  535. really handle this properly, but we're always bad at
  536. this. */
  537. if (ret == DB_RUNRECOVERY || ret == ENOSPC) {
  538. import_log_notice(worker->job, "OUT OF SPACE ON DISK, "
  539. "failed writing index file %s",
  540. worker->index_info->name);
  541. } else {
  542. import_log_notice(worker->job, "Failed to write "
  543. "index file %s, errno=%d (%s)\n",
  544. worker->index_info->name, errno,
  545. dblayer_strerror(errno));
  546. }
  547. }
  548. }
  549. }
  550. preclose_ret = ret;
  551. /* Now close the files */
  552. dblayer_close_file(output_file);
  553. /* Close the cursors */
  554. /* Close and delete the files */
  555. for (i = 0; i < passes; i++) {
  556. DBC *cursor = input_cursors[i];
  557. DB *db = input_files[i];
  558. if (NULL != db) {
  559. PR_ASSERT(NULL != cursor);
  560. ret = cursor->c_close(cursor);
  561. if (0 != ret) {
  562. import_log_notice(worker->job, "MERGE FAIL 4");
  563. }
  564. ret = dblayer_close_file(db);
  565. if (0 != ret) {
  566. import_log_notice(worker->job, "MERGE FAIL 5");
  567. }
  568. /* Now make the filename and delete the file */
  569. {
  570. char *newname = NULL;
  571. char *oldname = NULL;
  572. ret = import_make_merge_filenames(inst->inst_dir_name,
  573. worker->index_info->name, i+1, &oldname, &newname);
  574. if (0 != ret) {
  575. import_log_notice(worker->job, "MERGE FAIL 6");
  576. } else {
  577. ret = PR_Delete(newname);
  578. if (0 != ret) {
  579. import_log_notice(worker->job, "MERGE FAIL 7");
  580. }
  581. slapi_ch_free( (void**)&newname);
  582. slapi_ch_free( (void**)&oldname);
  583. }
  584. }
  585. }
  586. }
  587. if (preclose_ret != 0) ret = preclose_ret;
  588. slapi_ch_free( (void**)&input_files);
  589. slapi_ch_free( (void**)&input_cursors);
  590. }
  591. if (EOF == ret) {
  592. ret = 0;
  593. }
  594. error:
  595. return ret;
  596. }
  597. /********** the real deal here: **********/
  598. /* Our mission here is as follows:
  599. * for each index job except entrydn and id2entry:
  600. * open all the pass files
  601. * open a new output file
  602. * iterate cursors over all of the input files picking each distinct
  603. * key and combining the input IDLs into a merged IDL. Put that
  604. * IDL to the output file.
  605. */
  606. int import_mega_merge(ImportJob *job)
  607. {
  608. ImportWorkerInfo *current_worker = NULL;
  609. int ret = 0;
  610. time_t beginning = 0;
  611. time_t end = 0;
  612. int passes = job->current_pass;
  613. if (1 == job->number_indexers) {
  614. import_log_notice(job, "Beginning %d-way merge of one file...", passes);
  615. } else {
  616. import_log_notice(job, "Beginning %d-way merge of up to %lu files...",
  617. passes, job->number_indexers);
  618. }
  619. time(&beginning);
  620. /* Iterate over the files */
  621. for (current_worker = job->worker_list;
  622. (ret == 0) && (current_worker != NULL);
  623. current_worker = current_worker->next) {
  624. /* We need to ignore the primary index */
  625. if ((current_worker->work_type != FOREMAN) &&
  626. (current_worker->work_type != PRODUCER)) {
  627. time_t file_beginning = 0;
  628. time_t file_end = 0;
  629. int key_count = 0;
  630. time(&file_beginning);
  631. ret = import_merge_one_file(current_worker,passes,&key_count);
  632. time(&file_end);
  633. if (key_count == 0) {
  634. import_log_notice(job, "No files to merge for \"%s\".",
  635. current_worker->index_info->name);
  636. } else {
  637. if (-1 == key_count) {
  638. import_log_notice(job, "Merged \"%s\": Simple merge - "
  639. "file renamed.",
  640. current_worker->index_info->name);
  641. } else {
  642. import_log_notice(job, "Merged \"%s\": %d keys merged "
  643. "in %ld seconds.",
  644. current_worker->index_info->name,
  645. key_count, file_end-file_beginning);
  646. }
  647. }
  648. }
  649. }
  650. time(&end);
  651. if (0 == ret) {
  652. int seconds_to_merge = end - beginning;
  653. import_log_notice(job, "Merging completed in %d seconds.",
  654. seconds_to_merge);
  655. }
  656. return ret;
  657. }