import.c 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487
  1. /** BEGIN COPYRIGHT BLOCK
  2. * This Program is free software; you can redistribute it and/or modify it under
  3. * the terms of the GNU General Public License as published by the Free Software
  4. * Foundation; version 2 of the License.
  5. *
  6. * This Program is distributed in the hope that it will be useful, but WITHOUT
  7. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  8. * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  9. *
  10. * You should have received a copy of the GNU General Public License along with
  11. * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
  12. * Place, Suite 330, Boston, MA 02111-1307 USA.
  13. *
  14. * In addition, as a special exception, Red Hat, Inc. gives You the additional
  15. * right to link the code of this Program with code not covered under the GNU
  16. * General Public License ("Non-GPL Code") and to distribute linked combinations
  17. * including the two, subject to the limitations in this paragraph. Non-GPL Code
  18. * permitted under this exception must only link to the code of this Program
  19. * through those well defined interfaces identified in the file named EXCEPTION
  20. * found in the source code files (the "Approved Interfaces"). The files of
  21. * Non-GPL Code may instantiate templates or use macros or inline functions from
  22. * the Approved Interfaces without causing the resulting work to be covered by
  23. * the GNU General Public License. Only Red Hat, Inc. may make changes or
  24. * additions to the list of Approved Interfaces. You must obey the GNU General
  25. * Public License in all respects for all of the Program code and other code used
  26. * in conjunction with the Program except the Non-GPL Code covered by this
  27. * exception. If you modify this file, you may extend this exception to your
  28. * version of the file, but you are not obligated to do so. If you do not wish to
  29. * provide this exception without modification, you must delete this exception
  30. * statement from your version and license this file solely under the GPL without
  31. * exception.
  32. *
  33. *
  34. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  35. * Copyright (C) 2005 Red Hat, Inc.
  36. * All rights reserved.
  37. * END COPYRIGHT BLOCK **/
  38. #ifdef HAVE_CONFIG_H
  39. # include <config.h>
  40. #endif
  41. /*
  42. * the "new" ("deluxe") backend import code
  43. *
  44. * please make sure you use 4-space indentation on this file.
  45. */
  46. #include "back-ldbm.h"
  47. #include "vlv_srch.h"
  48. #include "import.h"
  49. #define ERR_IMPORT_ABORTED -23
  50. /********** routines to manipulate the entry fifo **********/
  51. /* this is pretty bogus -- could be a HUGE amount of memory */
  52. /* Not anymore with the Import Queue Adaptative Algorithm (Regulation) */
  53. #define MAX_FIFO_SIZE 8000
  54. static int import_fifo_init(ImportJob *job)
  55. {
  56. ldbm_instance *inst = job->inst;
  57. /* Work out how big the entry fifo can be */
  58. if (inst->inst_cache.c_maxentries > 0)
  59. job->fifo.size = inst->inst_cache.c_maxentries;
  60. else
  61. job->fifo.size = inst->inst_cache.c_maxsize / 1024; /* guess */
  62. /* byte limit that should be respected to avoid memory starvation */
  63. /* conservative computing: multiply by .8 to allow for reasonable overflow */
  64. job->fifo.bsize = (inst->inst_cache.c_maxsize/10) << 3;
  65. job->fifo.c_bsize = 0;
  66. if (job->fifo.size > MAX_FIFO_SIZE)
  67. job->fifo.size = MAX_FIFO_SIZE;
  68. /* has to be at least 1 or 2, and anything less than about 100 destroys
  69. * the point of doing all this optimization in the first place. */
  70. if (job->fifo.size < 100)
  71. job->fifo.size = 100;
  72. /* Get memory for the entry fifo */
  73. /* This is used to keep a ref'ed pointer to the last <cachesize>
  74. * processed entries */
  75. PR_ASSERT(NULL == job->fifo.item);
  76. job->fifo.item = (FifoItem *)slapi_ch_calloc(job->fifo.size,
  77. sizeof(FifoItem));
  78. if (NULL == job->fifo.item) {
  79. /* Memory allocation error */
  80. return -1;
  81. }
  82. return 0;
  83. }
  84. FifoItem *import_fifo_fetch(ImportJob *job, ID id, int worker)
  85. {
  86. int idx = id % job->fifo.size;
  87. FifoItem *fi;
  88. if (job->fifo.item) {
  89. fi = &(job->fifo.item[idx]);
  90. } else {
  91. return NULL;
  92. }
  93. if (fi->entry) {
  94. if (worker && fi->bad) {
  95. import_log_notice(job, "WARNING: bad entry: ID %d", id);
  96. return NULL;
  97. }
  98. PR_ASSERT(fi->entry->ep_refcnt > 0);
  99. }
  100. return fi;
  101. }
  102. static void import_fifo_destroy(ImportJob *job)
  103. {
  104. /* Free any entries in the fifo first */
  105. struct backentry *be = NULL;
  106. size_t i = 0;
  107. for (i = 0; i < job->fifo.size; i++) {
  108. be = job->fifo.item[i].entry;
  109. backentry_free(&be);
  110. job->fifo.item[i].entry = NULL;
  111. job->fifo.item[i].filename = NULL;
  112. }
  113. slapi_ch_free((void **)&job->fifo.item);
  114. job->fifo.item = NULL;
  115. }
  116. /********** logging stuff **********/
  117. #define LOG_BUFFER 256
  118. /* this changes the 'nsTaskStatus' value, which is transient (anything logged
  119. * here wipes out any previous status)
  120. */
  121. static void import_log_status_start(ImportJob *job)
  122. {
  123. if (! job->task_status)
  124. job->task_status = (char *)slapi_ch_malloc(10 * LOG_BUFFER);
  125. if (! job->task_status)
  126. return; /* out of memory? */
  127. job->task_status[0] = 0;
  128. }
  129. static void import_log_status_add_line(ImportJob *job, char *format, ...)
  130. {
  131. va_list ap;
  132. int len = 0;
  133. if (! job->task_status)
  134. return;
  135. len = strlen(job->task_status);
  136. if (len + 5 > (10 * LOG_BUFFER))
  137. return; /* no room */
  138. if (job->task_status[0])
  139. strcat(job->task_status, "\n");
  140. va_start(ap, format);
  141. PR_vsnprintf(job->task_status + len, (10 * LOG_BUFFER) - len, format, ap);
  142. va_end(ap);
  143. }
  144. static void import_log_status_done(ImportJob *job)
  145. {
  146. if (job->task) {
  147. slapi_task_log_status(job->task, "%s", job->task_status);
  148. }
  149. }
  150. /* this adds a line to the 'nsTaskLog' value, which is cumulative (anything
  151. * logged here is added to the end)
  152. */
  153. void import_log_notice(ImportJob *job, char *format, ...)
  154. {
  155. va_list ap;
  156. char buffer[LOG_BUFFER];
  157. va_start(ap, format);
  158. PR_vsnprintf(buffer, LOG_BUFFER, format, ap);
  159. va_end(ap);
  160. if (job->task) {
  161. slapi_task_log_notice(job->task, "%s", buffer);
  162. }
  163. /* also save it in the logs for posterity */
  164. LDAPDebug(LDAP_DEBUG_ANY, "import %s: %s\n", job->inst->inst_name,
  165. buffer, 0);
  166. }
  167. static void import_task_destroy(Slapi_Task *task)
  168. {
  169. ImportJob *job = (ImportJob *)slapi_task_get_data(task);
  170. if (job && job->task_status) {
  171. slapi_ch_free((void **)&job->task_status);
  172. job->task_status = NULL;
  173. }
  174. FREE(job);
  175. slapi_task_set_data(task, NULL);
  176. }
  177. static void import_task_abort(Slapi_Task *task)
  178. {
  179. ImportJob *job;
  180. /* don't log anything from here, because we're still holding the
  181. * DSE lock for modify...
  182. */
  183. if (slapi_task_get_state(task) == SLAPI_TASK_FINISHED) {
  184. /* too late */
  185. }
  186. /*
  187. * Race condition.
  188. * If the import thread happens to finish right now we're in trouble
  189. * because it will free the job.
  190. */
  191. job = (ImportJob *)slapi_task_get_data(task);
  192. import_abort_all(job, 0);
  193. while (slapi_task_get_state(task) != SLAPI_TASK_FINISHED)
  194. DS_Sleep(PR_MillisecondsToInterval(100));
  195. }
  196. /********** helper functions for importing **********/
  197. /* Function used to gather a list of indexed attrs */
  198. static int import_attr_callback(void *node, void *param)
  199. {
  200. ImportJob *job = (ImportJob *)param;
  201. struct attrinfo *a = (struct attrinfo *)node;
  202. /* OK, so we now have hold of the attribute structure and the job info,
  203. * let's see what we have. Remember that although this function is called
  204. * many times, all these calls are in the context of a single thread, so we
  205. * don't need to worry about protecting the data in the job structure.
  206. */
  207. /* We need to specifically exclude the entrydn & parentid indexes because
  208. * we build those in the foreman thread.
  209. */
  210. if (IS_INDEXED(a->ai_indexmask) &&
  211. (strcasecmp(a->ai_type, "entrydn") != 0) &&
  212. (strcasecmp(a->ai_type, "parentid") != 0) &&
  213. (strcasecmp(a->ai_type, "ancestorid") != 0) &&
  214. (strcasecmp(a->ai_type, numsubordinates) != 0)) {
  215. /* Make an import_index_info structure, fill it in and insert into the
  216. * job's list */
  217. IndexInfo *info = CALLOC(IndexInfo);
  218. if (NULL == info) {
  219. /* Memory allocation error */
  220. return -1;
  221. }
  222. info->name = slapi_ch_strdup(a->ai_type);
  223. info->ai = a;
  224. if (NULL == info->name) {
  225. /* Memory allocation error */
  226. FREE(info);
  227. return -1;
  228. }
  229. info->next = job->index_list;
  230. job->index_list = info;
  231. job->number_indexers++;
  232. }
  233. return 0;
  234. }
  235. static void import_set_index_buffer_size(ImportJob *job)
  236. {
  237. IndexInfo *current_index = NULL;
  238. size_t substring_index_count = 0;
  239. size_t proposed_size = 0;
  240. /* Count the substring indexes we have */
  241. for (current_index = job->index_list; current_index != NULL;
  242. current_index = current_index->next) {
  243. if (current_index->ai->ai_indexmask & INDEX_SUB) {
  244. substring_index_count++;
  245. }
  246. }
  247. if (substring_index_count > 0) {
  248. /* Make proposed size such that if all substring indices were
  249. * reasonably full, we'd hit the target space */
  250. proposed_size = (job->job_index_buffer_size / substring_index_count) /
  251. IMPORT_INDEX_BUFFER_SIZE_CONSTANT;
  252. if (proposed_size > IMPORT_MAX_INDEX_BUFFER_SIZE) {
  253. proposed_size = IMPORT_MAX_INDEX_BUFFER_SIZE;
  254. }
  255. if (proposed_size < IMPORT_MIN_INDEX_BUFFER_SIZE) {
  256. proposed_size = 0;
  257. }
  258. }
  259. job->job_index_buffer_suggestion = proposed_size;
  260. }
  261. static void import_free_thread_data(ImportJob *job)
  262. {
  263. /* DBDB free the lists etc */
  264. ImportWorkerInfo *worker = job->worker_list;
  265. while (worker != NULL) {
  266. ImportWorkerInfo *asabird = worker;
  267. worker = worker->next;
  268. if (asabird->work_type != PRODUCER)
  269. slapi_ch_free( (void**)&asabird);
  270. }
  271. }
  272. void import_free_job(ImportJob *job)
  273. {
  274. /* DBDB free the lists etc */
  275. IndexInfo *index = job->index_list;
  276. import_free_thread_data(job);
  277. while (index != NULL) {
  278. IndexInfo *asabird = index;
  279. index = index->next;
  280. slapi_ch_free( (void**)&asabird->name);
  281. slapi_ch_free( (void**)&asabird);
  282. }
  283. job->index_list = NULL;
  284. if (NULL != job->mothers) {
  285. import_subcount_stuff_term(job->mothers);
  286. slapi_ch_free( (void**)&job->mothers);
  287. }
  288. ldbm_back_free_incl_excl(job->include_subtrees, job->exclude_subtrees);
  289. charray_free(job->input_filenames);
  290. if (job->fifo.size)
  291. import_fifo_destroy(job);
  292. if (NULL != job->uuid_namespace)
  293. slapi_ch_free((void **)&job->uuid_namespace);
  294. if (job->wire_lock)
  295. PR_DestroyLock(job->wire_lock);
  296. if (job->wire_cv)
  297. PR_DestroyCondVar(job->wire_cv);
  298. slapi_ch_free((void **)&job->task_status);
  299. }
  300. /* determine if we are the correct backend for this entry
  301. * (in a distributed suffix, some entries may be for other backends).
  302. * if the entry's dn actually matches one of the suffixes of the be, we
  303. * automatically take it as a belonging one, for such entries must be
  304. * present in EVERY backend independently of the distribution applied.
  305. */
  306. int import_entry_belongs_here(Slapi_Entry *e, backend *be)
  307. {
  308. Slapi_Backend *retbe;
  309. Slapi_DN *sdn = slapi_entry_get_sdn(e);
  310. if (slapi_be_issuffix(be, sdn))
  311. return 1;
  312. retbe = slapi_mapping_tree_find_backend_for_sdn(sdn);
  313. return (retbe == be);
  314. }
  315. /********** starting threads and stuff **********/
  316. /* Solaris is weird---we need an LWP per thread but NSPR doesn't give us
  317. * one unless we make this magic belshe-call */
  318. /* Fixed on Solaris 8; NSPR supports PR_GLOBAL_BOUND_THREAD */
  319. #define CREATE_THREAD PR_CreateThread
  320. static void import_init_worker_info(ImportWorkerInfo *info, ImportJob *job)
  321. {
  322. info->command = PAUSE;
  323. info->job = job;
  324. info->first_ID = job->first_ID;
  325. info->index_buffer_size = job->job_index_buffer_suggestion;
  326. }
  327. static int import_start_threads(ImportJob *job)
  328. {
  329. IndexInfo *current_index = NULL;
  330. ImportWorkerInfo *foreman = NULL, *worker = NULL;
  331. foreman = CALLOC(ImportWorkerInfo);
  332. if (!foreman)
  333. goto error;
  334. /* start the foreman */
  335. import_init_worker_info(foreman, job);
  336. foreman->work_type = FOREMAN;
  337. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_foreman, foreman,
  338. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  339. PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  340. PRErrorCode prerr = PR_GetError();
  341. LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import foreman thread, "
  342. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  343. prerr, slapd_pr_strerror(prerr), 0);
  344. FREE(foreman);
  345. goto error;
  346. }
  347. foreman->next = job->worker_list;
  348. job->worker_list = foreman;
  349. /* Start follower threads, if we are doing attribute indexing */
  350. current_index = job->index_list;
  351. if (job->flags & FLAG_INDEX_ATTRS) {
  352. while (current_index) {
  353. /* make a new thread info structure */
  354. worker = CALLOC(ImportWorkerInfo);
  355. if (! worker)
  356. goto error;
  357. /* fill it in */
  358. import_init_worker_info(worker, job);
  359. worker->index_info = current_index;
  360. worker->work_type = WORKER;
  361. /* Start the thread */
  362. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_worker, worker,
  363. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  364. PR_UNJOINABLE_THREAD,
  365. SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  366. PRErrorCode prerr = PR_GetError();
  367. LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import worker thread, "
  368. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  369. prerr, slapd_pr_strerror(prerr), 0);
  370. FREE(worker);
  371. goto error;
  372. }
  373. /* link it onto the job's thread list */
  374. worker->next = job->worker_list;
  375. job->worker_list = worker;
  376. current_index = current_index->next;
  377. }
  378. }
  379. return 0;
  380. error:
  381. import_log_notice(job, "Import thread creation failed.");
  382. import_log_notice(job, "Aborting all import threads...");
  383. import_abort_all(job, 1);
  384. import_log_notice(job, "Import threads aborted.");
  385. return -1;
  386. }
  387. /********** monitoring the worker threads **********/
  388. static void import_clear_progress_history(ImportJob *job)
  389. {
  390. int i = 0;
  391. for (i = 0; i < IMPORT_JOB_PROG_HISTORY_SIZE /*- 1*/; i++) {
  392. job->progress_history[i] = job->first_ID;
  393. job->progress_times[i] = job->start_time;
  394. }
  395. /* reset libdb cache stats */
  396. job->inst->inst_cache_hits = job->inst->inst_cache_misses = 0;
  397. }
  398. static double import_grok_db_stats(ldbm_instance *inst)
  399. {
  400. DB_MPOOL_STAT *mpstat = NULL;
  401. DB_MPOOL_FSTAT **mpfstat = NULL;
  402. int return_value = -1;
  403. double cache_hit_ratio = 0.0;
  404. return_value = dblayer_memp_stat_instance(inst, &mpstat, &mpfstat);
  405. if (0 == return_value) {
  406. unsigned long current_cache_hits = mpstat->st_cache_hit;
  407. unsigned long current_cache_misses = mpstat->st_cache_miss;
  408. if (inst->inst_cache_hits) {
  409. unsigned long hit_delta, miss_delta;
  410. hit_delta = current_cache_hits - inst->inst_cache_hits;
  411. miss_delta = current_cache_misses - inst->inst_cache_misses;
  412. if (hit_delta != 0) {
  413. cache_hit_ratio = (double)hit_delta /
  414. (double)(hit_delta + miss_delta);
  415. }
  416. }
  417. inst->inst_cache_misses = current_cache_misses;
  418. inst->inst_cache_hits = current_cache_hits;
  419. if (mpstat)
  420. slapi_ch_free((void **)&mpstat);
  421. if (mpfstat) {
  422. #if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR + DB_VERSION_PATCH <= 3204
  423. /* In DB 3.2.4 and earlier, we need to free each element */
  424. DB_MPOOL_FSTAT **tfsp;
  425. for (tfsp = mpfstat; *tfsp; tfsp++)
  426. slapi_ch_free((void **)tfsp);
  427. #endif
  428. slapi_ch_free((void **)&mpfstat);
  429. }
  430. }
  431. return cache_hit_ratio;
  432. }
  433. static char* import_decode_worker_state(int state)
  434. {
  435. switch (state) {
  436. case WAITING:
  437. return "W";
  438. case RUNNING:
  439. return "R";
  440. case FINISHED:
  441. return "F";
  442. case ABORTED:
  443. return "A";
  444. default:
  445. return "?";
  446. }
  447. }
  448. static void import_print_worker_status(ImportWorkerInfo *info)
  449. {
  450. char *name = (info->work_type == PRODUCER ? "Producer" :
  451. (info->work_type == FOREMAN ? "Foreman" :
  452. info->index_info->name));
  453. import_log_status_add_line(info->job,
  454. "%-25s %s%10ld %7.1f", name,
  455. import_decode_worker_state(info->state),
  456. info->last_ID_processed, info->rate);
  457. }
  458. #define IMPORT_CHUNK_TEST_HOLDOFF_TIME (5*60) /* Seconds */
  459. /* Got to be lower than this: */
  460. #define IMPORT_CHUNK_TEST_CACHE_HIT_RATIO (0.99)
  461. /* Less than half as fast as we were doing: */
  462. #define IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A (0.5)
  463. /* A lot less fast than we were doing: */
  464. #define IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B (0.1)
  465. static int import_throw_in_towel(ImportJob *job, time_t current_time,
  466. ID trailing_ID)
  467. {
  468. static int number_of_times_here = 0;
  469. /* secret -c option allows specific chunk size to be set... */
  470. if (job->merge_chunk_size != 0) {
  471. if ((0 != job->lead_ID) &&
  472. (trailing_ID > job->first_ID) &&
  473. (trailing_ID - job->first_ID > job->merge_chunk_size)) {
  474. return 1;
  475. }
  476. return 0;
  477. }
  478. /* Check stats to decide whether we're getting bogged down and should
  479. * terminate this pass.
  480. */
  481. /* Check #1 : are we more than 10 minutes into the chunk ? */
  482. if (current_time - job->start_time > IMPORT_CHUNK_TEST_HOLDOFF_TIME) {
  483. /* Check #2 : Have we slowed down considerably recently ? */
  484. if ((job->recent_progress_rate / job->average_progress_rate) <
  485. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A) {
  486. /* Check #3: Cache performing poorly---the puported reason
  487. * for the slowdown */
  488. if (job->cache_hit_ratio < IMPORT_CHUNK_TEST_CACHE_HIT_RATIO) {
  489. /* We have a winner ! */
  490. import_log_notice(job, "Decided to end this pass because "
  491. "the progress rate has dropped below "
  492. "the %.0f%% threshold.",
  493. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A*100.0);
  494. return 1;
  495. }
  496. } else {
  497. if ((job->recent_progress_rate / job->average_progress_rate) <
  498. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B) {
  499. /* Alternative check: have we really, really slowed down,
  500. * without the test for cache overflow? */
  501. /* This is designed to catch the case where the cache has
  502. * been misconfigured too large */
  503. if (number_of_times_here > 10) {
  504. /* Got to get here ten times at least */
  505. import_log_notice(job, "Decided to end this pass "
  506. "because the progress rate "
  507. "plummeted below %.0f%%",
  508. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B*100.0);
  509. return 1;
  510. }
  511. number_of_times_here++;
  512. }
  513. }
  514. }
  515. number_of_times_here = 0;
  516. return 0;
  517. }
  518. static void import_push_progress_history(ImportJob *job, ID current_id,
  519. time_t current_time)
  520. {
  521. int i = 0;
  522. for (i = 0; i < IMPORT_JOB_PROG_HISTORY_SIZE - 1; i++) {
  523. job->progress_history[i] = job->progress_history[i+1];
  524. job->progress_times[i] = job->progress_times[i+1];
  525. }
  526. job->progress_history[i] = current_id;
  527. job->progress_times[i] = current_time;
  528. }
  529. static void import_calc_rate(ImportWorkerInfo *info, int time_interval)
  530. {
  531. size_t ids = info->last_ID_processed - info->previous_ID_counted;
  532. double rate = (double)ids / time_interval;
  533. if ( (info->previous_ID_counted != 0) && (info->last_ID_processed != 0) ) {
  534. info->rate = rate;
  535. } else {
  536. info->rate = 0;
  537. }
  538. info->previous_ID_counted = info->last_ID_processed;
  539. }
  540. /* find the rate (ids/time) of work from a worker thread between history
  541. * marks A and B.
  542. */
  543. #define HISTORY(N) (job->progress_history[N])
  544. #define TIMES(N) (job->progress_times[N])
  545. #define PROGRESS(A, B) ((HISTORY(B) > HISTORY(A)) ? \
  546. ((double)(HISTORY(B) - HISTORY(A)) / \
  547. (double)(TIMES(B) - TIMES(A))) : \
  548. (double)0)
  549. static int import_monitor_threads(ImportJob *job, int *status)
  550. {
  551. PRIntervalTime tenthsecond = PR_MillisecondsToInterval(100);
  552. ImportWorkerInfo *current_worker = NULL;
  553. ImportWorkerInfo *producer = NULL, *foreman = NULL;
  554. int finished = 0;
  555. int giveup = 0;
  556. int count = 1; /* 1 to prevent premature status report */
  557. int producer_done = 0;
  558. const int display_interval = 200;
  559. time_t time_now = 0;
  560. time_t last_time = 0;
  561. time_t time_interval = 0;
  562. for (current_worker = job->worker_list; current_worker != NULL;
  563. current_worker = current_worker->next) {
  564. current_worker->command = RUN;
  565. if (current_worker->work_type == PRODUCER)
  566. producer = current_worker;
  567. if (current_worker->work_type == FOREMAN)
  568. foreman = current_worker;
  569. }
  570. if (job->flags & FLAG_USE_FILES)
  571. PR_ASSERT(producer != NULL);
  572. PR_ASSERT(foreman != NULL);
  573. time(&last_time);
  574. job->start_time = last_time;
  575. import_clear_progress_history(job);
  576. while (!finished) {
  577. ID trailing_ID = NOID;
  578. DS_Sleep(tenthsecond);
  579. finished = 1;
  580. /* First calculate the time interval since last reported */
  581. if (0 == (count % display_interval)) {
  582. time(&time_now);
  583. time_interval = time_now - last_time;
  584. last_time = time_now;
  585. /* Now calculate our rate of progress overall for this chunk */
  586. if (time_now != job->start_time) {
  587. /* log a cute chart of the worker progress */
  588. import_log_status_start(job);
  589. import_log_status_add_line(job,
  590. "Index status for import of %s:", job->inst->inst_name);
  591. import_log_status_add_line(job,
  592. "-------Index Task-------State---Entry----Rate-");
  593. import_push_progress_history(job, foreman->last_ID_processed,
  594. time_now);
  595. job->average_progress_rate =
  596. (double)(HISTORY(IMPORT_JOB_PROG_HISTORY_SIZE-1)+1 - foreman->first_ID) /
  597. (double)(TIMES(IMPORT_JOB_PROG_HISTORY_SIZE-1) - job->start_time);
  598. job->recent_progress_rate =
  599. PROGRESS(0, IMPORT_JOB_PROG_HISTORY_SIZE-1);
  600. job->cache_hit_ratio = import_grok_db_stats(job->inst);
  601. }
  602. }
  603. for (current_worker = job->worker_list; current_worker != NULL;
  604. current_worker = current_worker->next) {
  605. /* Calculate the ID at which the slowest worker is currently
  606. * processing */
  607. if ((trailing_ID > current_worker->last_ID_processed) &&
  608. (current_worker->work_type == WORKER)) {
  609. trailing_ID = current_worker->last_ID_processed;
  610. }
  611. if (0 == (count % display_interval) && time_interval) {
  612. import_calc_rate(current_worker, time_interval);
  613. import_print_worker_status(current_worker);
  614. }
  615. if (current_worker->state != FINISHED) {
  616. finished = 0;
  617. }
  618. if (current_worker->state == ABORTED) {
  619. goto error_abort;
  620. }
  621. }
  622. if ((0 == (count % display_interval)) &&
  623. (job->start_time != time_now)) {
  624. char buffer[256], *p = buffer;
  625. import_log_status_done(job);
  626. p += sprintf(p, "Processed %lu entries ", (u_long)job->ready_ID);
  627. if (job->total_pass > 1)
  628. p += sprintf(p, "(pass %d) ", job->total_pass);
  629. p += sprintf(p, "-- average rate %.1f/sec, ",
  630. job->average_progress_rate);
  631. p += sprintf(p, "recent rate %.1f/sec, ",
  632. job->recent_progress_rate);
  633. p += sprintf(p, "hit ratio %.0f%%", job->cache_hit_ratio * 100.0);
  634. import_log_notice(job, "%s", buffer);
  635. }
  636. /* Then let's see if it's time to complete this import pass */
  637. if (!giveup) {
  638. giveup = import_throw_in_towel(job, time_now, trailing_ID);
  639. if (giveup) {
  640. /* If so, signal the lead thread to stop */
  641. import_log_notice(job, "Ending pass number %d ...",
  642. job->total_pass);
  643. foreman->command = STOP;
  644. while (foreman->state != FINISHED) {
  645. DS_Sleep(tenthsecond);
  646. }
  647. import_log_notice(job, "Foreman is done; waiting for "
  648. "workers to finish...");
  649. }
  650. }
  651. /* if the producer is finished, and the foreman has caught up... */
  652. if (producer) {
  653. producer_done = (producer->state == FINISHED);
  654. } else {
  655. producer_done = (job->flags & FLAG_PRODUCER_DONE);
  656. }
  657. if (producer_done && (job->lead_ID == job->ready_ID)) {
  658. /* tell the foreman to stop if he's still working. */
  659. if (foreman->state != FINISHED)
  660. foreman->command = STOP;
  661. /* if all the workers are caught up too, we're done */
  662. if (trailing_ID == job->lead_ID)
  663. break;
  664. }
  665. /* if the foreman is done (end of pass) and the worker threads
  666. * have caught up...
  667. */
  668. if ((foreman->state == FINISHED) && (job->ready_ID == trailing_ID)) {
  669. break;
  670. }
  671. count++;
  672. }
  673. import_log_notice(job, "Workers finished; cleaning up...");
  674. /* Now tell all the workers to stop */
  675. for (current_worker = job->worker_list; current_worker != NULL;
  676. current_worker = current_worker->next) {
  677. if (current_worker->work_type != PRODUCER)
  678. current_worker->command = STOP;
  679. }
  680. /* Having done that, wait for them to say that they've stopped */
  681. for (current_worker = job->worker_list; current_worker != NULL; ) {
  682. if ((current_worker->state != FINISHED) &&
  683. (current_worker->state != ABORTED) &&
  684. (current_worker->work_type != PRODUCER)) {
  685. DS_Sleep(tenthsecond); /* Only sleep if we hit a thread that is still not done */
  686. continue;
  687. } else {
  688. current_worker = current_worker->next;
  689. }
  690. }
  691. import_log_notice(job, "Workers cleaned up.");
  692. /* If we're here and giveup is true, and the primary hadn't finished
  693. * processing the input files, we need to return IMPORT_INCOMPLETE_PASS */
  694. if (giveup && (job->input_filenames || (job->flags & FLAG_ONLINE) ||
  695. (job->flags & FLAG_REINDEXING /* support multi-pass */))) {
  696. if (producer_done && (job->ready_ID == job->lead_ID)) {
  697. /* foreman caught up with the producer, and the producer is
  698. * done.
  699. */
  700. *status = IMPORT_COMPLETE_PASS;
  701. } else {
  702. *status = IMPORT_INCOMPLETE_PASS;
  703. }
  704. } else {
  705. *status = IMPORT_COMPLETE_PASS;
  706. }
  707. return 0;
  708. error_abort:
  709. return ERR_IMPORT_ABORTED;
  710. }
  711. /********** running passes **********/
  712. static int import_run_pass(ImportJob *job, int *status)
  713. {
  714. int ret = 0;
  715. /* Start the threads running */
  716. ret = import_start_threads(job);
  717. if (ret != 0) {
  718. import_log_notice(job, "Starting threads failed: %d\n", ret);
  719. goto error;
  720. }
  721. /* Monitor the threads until we're done or fail */
  722. ret = import_monitor_threads(job, status);
  723. if (ret == ERR_IMPORT_ABORTED) {
  724. goto error;
  725. } else if (ret != 0) {
  726. import_log_notice(job, "Thread monitoring aborted: %d\n", ret);
  727. goto error;
  728. }
  729. error:
  730. return ret;
  731. }
  732. static void import_set_abort_flag_all(ImportJob *job, int wait_for_them)
  733. {
  734. ImportWorkerInfo *worker;
  735. /* tell all the worker threads to abort */
  736. job->flags |= FLAG_ABORT;
  737. /* setting of the flag in the job will be detected in the worker, foreman
  738. * threads and if there are any threads which have a sleeptime 200 msecs
  739. * = import_sleep_time; after that time, they will examine the condition
  740. * (job->flags & FLAG_ABORT) which will unblock the thread to proceed to
  741. * abort. Hence, we will sleep here for atleast 3 sec to make sure clean
  742. * up occurs */
  743. /* allow all the aborts to be processed */
  744. DS_Sleep(PR_MillisecondsToInterval(3000));
  745. if (wait_for_them) {
  746. /* Having done that, wait for them to say that they've stopped */
  747. for (worker = job->worker_list; worker != NULL; ) {
  748. DS_Sleep(PR_MillisecondsToInterval(100));
  749. if ((worker->state != FINISHED) &&
  750. (worker->state != ABORTED)){
  751. continue;
  752. }
  753. else{
  754. worker = worker->next;
  755. }
  756. }
  757. }
  758. }
  759. /* tell all the threads to abort */
  760. void import_abort_all(ImportJob *job, int wait_for_them)
  761. {
  762. ImportWorkerInfo *worker;
  763. /* tell all the worker threads to abort */
  764. job->flags |= FLAG_ABORT;
  765. for (worker = job->worker_list; worker; worker = worker->next)
  766. worker->command = ABORT;
  767. if (wait_for_them) {
  768. /* Having done that, wait for them to say that they've stopped */
  769. for (worker = job->worker_list; worker != NULL; ) {
  770. DS_Sleep(PR_MillisecondsToInterval(100));
  771. if ((worker->state != FINISHED) &&
  772. (worker->state != ABORTED))
  773. continue;
  774. else
  775. worker = worker->next;
  776. }
  777. }
  778. }
  779. /* Helper function to make up filenames */
  780. int import_make_merge_filenames(char *directory, char *indexname, int pass,
  781. char **oldname, char **newname)
  782. {
  783. /* Filenames look like this: attributename<LDBM_FILENAME_SUFFIX>
  784. and need to be renamed to: attributename<LDBM_FILENAME_SUFFIX>.n
  785. where n is the pass number.
  786. */
  787. *oldname = slapi_ch_smprintf("%s/%s%s", directory, indexname, LDBM_FILENAME_SUFFIX);
  788. *newname = slapi_ch_smprintf("%s/%s.%d%s", directory, indexname, pass,
  789. LDBM_FILENAME_SUFFIX);
  790. if (!*oldname || !*newname) {
  791. slapi_ch_free_string(oldname);
  792. slapi_ch_free_string(newname);
  793. return -1;
  794. }
  795. return 0;
  796. }
  797. /* Task here is as follows:
  798. * First, if this is pass #1, check for the presence of a merge
  799. * directory. If it is not present, create it.
  800. * If it is present, delete all the files in it.
  801. * Then, flush the dblayer and close files.
  802. * Now create a numbered subdir of the merge directory for this pass.
  803. * Next, move the index files, except entrydn, parentid and id2entry to
  804. * the merge subdirectory. Important to move if we can, because
  805. * that can be millions of times faster than a copy.
  806. * Finally open the dblayer back up because the caller expects
  807. * us to not muck with it.
  808. */
  809. static int import_sweep_after_pass(ImportJob *job)
  810. {
  811. backend *be = job->inst->inst_be;
  812. int ret = 0;
  813. import_log_notice(job, "Sweeping files for merging later...");
  814. ret = dblayer_instance_close(be);
  815. if (0 == ret) {
  816. /* Walk the list of index jobs */
  817. ImportWorkerInfo *current_worker = NULL;
  818. for (current_worker = job->worker_list; current_worker != NULL;
  819. current_worker = current_worker->next) {
  820. /* Foreach job, rename the file to <filename>.n, where n is the
  821. * pass number */
  822. if ((current_worker->work_type != FOREMAN) &&
  823. (current_worker->work_type != PRODUCER) &&
  824. (strcasecmp(current_worker->index_info->name, "parentid") != 0)) {
  825. char *newname = NULL;
  826. char *oldname = NULL;
  827. ret = import_make_merge_filenames(job->inst->inst_dir_name,
  828. current_worker->index_info->name, job->current_pass,
  829. &oldname, &newname);
  830. if (0 != ret) {
  831. break;
  832. }
  833. if (PR_Access(oldname, PR_ACCESS_EXISTS) == PR_SUCCESS) {
  834. ret = PR_Rename(oldname, newname);
  835. if (ret != PR_SUCCESS) {
  836. PRErrorCode prerr = PR_GetError();
  837. import_log_notice(job, "Failed to rename file \"%s\" to \"%s\", "
  838. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)",
  839. oldname, newname, prerr, slapd_pr_strerror(prerr));
  840. slapi_ch_free( (void**)&newname);
  841. slapi_ch_free( (void**)&oldname);
  842. break;
  843. }
  844. }
  845. slapi_ch_free( (void**)&newname);
  846. slapi_ch_free( (void**)&oldname);
  847. }
  848. }
  849. ret = dblayer_instance_start(be, DBLAYER_IMPORT_MODE);
  850. }
  851. if (0 == ret) {
  852. import_log_notice(job, "Sweep done.");
  853. } else {
  854. if (ENOSPC == ret) {
  855. import_log_notice(job, "ERROR: NO DISK SPACE LEFT in sweep phase");
  856. } else {
  857. import_log_notice(job, "ERROR: Sweep phase error %d (%s)", ret,
  858. dblayer_strerror(ret));
  859. }
  860. }
  861. return ret;
  862. }
  863. /* when the import is done, this function is called to bring stuff back up.
  864. * returns 0 on success; anything else is an error
  865. */
  866. static int import_all_done(ImportJob *job, int ret)
  867. {
  868. ldbm_instance *inst = job->inst;
  869. /* Writing this file indicates to future server startups that
  870. * the db is OK */
  871. if (ret == 0) {
  872. char inst_dir[MAXPATHLEN*2];
  873. char *inst_dirp = NULL;
  874. inst_dirp = dblayer_get_full_inst_dir(inst->inst_li, inst,
  875. inst_dir, MAXPATHLEN*2);
  876. ret = dbversion_write(inst->inst_li, inst_dirp, NULL);
  877. if (inst_dirp != inst_dir)
  878. slapi_ch_free_string(&inst_dirp);
  879. }
  880. if ((job->task != NULL) && (0 == slapi_task_get_refcount(job->task))) {
  881. slapi_task_finish(job->task, ret);
  882. }
  883. if (job->flags & FLAG_ONLINE) {
  884. /* make sure the indexes are online as well */
  885. /* richm 20070919 - if index entries are added online, they
  886. are created and marked as INDEX_OFFLINE, in anticipation
  887. of someone doing a db2index. In this case, the db2index
  888. code will correctly unset the INDEX_OFFLINE flag.
  889. However, if import is used to create the indexes, the
  890. INDEX_OFFLINE flag will not be cleared. So, we do that
  891. here
  892. */
  893. IndexInfo *index = job->index_list;
  894. while (index != NULL) {
  895. index->ai->ai_indexmask &= ~INDEX_OFFLINE;
  896. index = index->next;
  897. }
  898. /* start up the instance */
  899. ret = dblayer_instance_start(job->inst->inst_be, DBLAYER_NORMAL_MODE);
  900. if (ret != 0)
  901. return ret;
  902. /* Reset USN slapi_counter with the last key of the entryUSN index */
  903. ldbm_set_last_usn(inst->inst_be);
  904. /* bring backend online again */
  905. slapi_mtn_be_enable(inst->inst_be);
  906. }
  907. return ret;
  908. }
  909. int import_main_offline(void *arg)
  910. {
  911. ImportJob *job = (ImportJob *)arg;
  912. ldbm_instance *inst = job->inst;
  913. backend *be = inst->inst_be;
  914. int ret = 0;
  915. time_t beginning = 0;
  916. time_t end = 0;
  917. int finished = 0;
  918. int status = 0;
  919. int verbose = 1;
  920. ImportWorkerInfo *producer = NULL;
  921. if (job->task)
  922. slapi_task_inc_refcount(job->task);
  923. PR_ASSERT(inst != NULL);
  924. time(&beginning);
  925. /* Decide which indexes are needed */
  926. if (job->flags & FLAG_INDEX_ATTRS) {
  927. /* Here, we get an AVL tree which contains nodes for all attributes
  928. * in the schema. Given this tree, we need to identify those nodes
  929. * which are marked for indexing. */
  930. avl_apply(job->inst->inst_attrs, (IFP)import_attr_callback,
  931. (caddr_t)job, -1, AVL_INORDER);
  932. vlv_getindices((IFP)import_attr_callback, (void *)job, be);
  933. }
  934. /* Determine how much index buffering space to allocate to each index */
  935. import_set_index_buffer_size(job);
  936. /* initialize the entry FIFO */
  937. ret = import_fifo_init(job);
  938. if (ret) {
  939. if (! (job->flags & FLAG_USE_FILES)) {
  940. PR_Lock(job->wire_lock);
  941. PR_NotifyCondVar(job->wire_cv);
  942. PR_Unlock(job->wire_lock);
  943. }
  944. goto error;
  945. }
  946. if (job->flags & FLAG_USE_FILES) {
  947. /* importing from files: start up a producer thread to read the
  948. * files and queue them
  949. */
  950. producer = CALLOC(ImportWorkerInfo);
  951. if (! producer)
  952. goto error;
  953. /* start the producer */
  954. import_init_worker_info(producer, job);
  955. producer->work_type = PRODUCER;
  956. if (job->flags & FLAG_REINDEXING)
  957. {
  958. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)index_producer, producer,
  959. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  960. PR_UNJOINABLE_THREAD,
  961. SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  962. PRErrorCode prerr = PR_GetError();
  963. LDAPDebug(LDAP_DEBUG_ANY,
  964. "unable to spawn index producer thread, "
  965. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  966. prerr, slapd_pr_strerror(prerr), 0);
  967. goto error;
  968. }
  969. }
  970. else
  971. {
  972. import_log_notice(job, "Beginning import job...");
  973. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_producer, producer,
  974. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  975. PR_UNJOINABLE_THREAD,
  976. SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  977. PRErrorCode prerr = PR_GetError();
  978. LDAPDebug(LDAP_DEBUG_ANY,
  979. "unable to spawn import producer thread, "
  980. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  981. prerr, slapd_pr_strerror(prerr), 0);
  982. goto error;
  983. }
  984. }
  985. if (0 == job->job_index_buffer_suggestion)
  986. import_log_notice(job, "Index buffering is disabled.");
  987. else
  988. import_log_notice(job,
  989. "Index buffering enabled with bucket size %lu",
  990. job->job_index_buffer_suggestion);
  991. job->worker_list = producer;
  992. } else {
  993. /* release the startup lock and let the entries start queueing up
  994. * in for import */
  995. PR_Lock(job->wire_lock);
  996. PR_NotifyCondVar(job->wire_cv);
  997. PR_Unlock(job->wire_lock);
  998. }
  999. /* Run as many passes as we need to complete the job or die honourably in
  1000. * the attempt */
  1001. while (! finished) {
  1002. job->current_pass++;
  1003. job->total_pass++;
  1004. ret = import_run_pass(job, &status);
  1005. /* The following could have happened:
  1006. * (a) Some error happened such that we're hosed.
  1007. * This is indicated by a non-zero return code.
  1008. * (b) We finished the complete file without needing a second pass
  1009. * This is indicated by a zero return code and a status of
  1010. * IMPORT_COMPLETE_PASS and current_pass == 1;
  1011. * (c) We completed a pass and need at least another one
  1012. * This is indicated by a zero return code and a status of
  1013. * IMPORT_INCOMPLETE_PASS
  1014. * (d) We just completed what turned out to be the last in a
  1015. * series of passes
  1016. * This is indicated by a zero return code and a status of
  1017. * IMPORT_COMPLETE_PASS and current_pass > 1
  1018. */
  1019. if (ret == ERR_IMPORT_ABORTED) {
  1020. /* at least one of the threads has aborted -- shut down ALL
  1021. * of the threads */
  1022. import_log_notice(job, "Aborting all import threads...");
  1023. /* this abort sets the abort flag on the threads and will block for
  1024. * the exit of all threads
  1025. */
  1026. import_set_abort_flag_all(job, 1);
  1027. import_log_notice(job, "Import threads aborted.");
  1028. goto error;
  1029. }
  1030. if (0 != ret) {
  1031. /* Some horrible fate has befallen the import */
  1032. import_log_notice(job, "Fatal pass error %d", ret);
  1033. goto error;
  1034. }
  1035. /* No error, but a number of possibilities */
  1036. if ( IMPORT_COMPLETE_PASS == status ) {
  1037. if (1 == job->current_pass) {
  1038. /* We're done !!!! */ ;
  1039. } else {
  1040. /* Save the files, then merge */
  1041. ret = import_sweep_after_pass(job);
  1042. if (0 != ret) {
  1043. goto error;
  1044. }
  1045. ret = import_mega_merge(job);
  1046. if (0 != ret) {
  1047. goto error;
  1048. }
  1049. }
  1050. finished = 1;
  1051. } else {
  1052. if (IMPORT_INCOMPLETE_PASS == status) {
  1053. /* Need to go round again */
  1054. /* Time to save the files we've built for later */
  1055. ret = import_sweep_after_pass(job);
  1056. if (0 != ret) {
  1057. goto error;
  1058. }
  1059. if ( (inst->inst_li->li_maxpassbeforemerge != 0) &&
  1060. (job->current_pass > inst->inst_li->li_maxpassbeforemerge) )
  1061. {
  1062. ret = import_mega_merge(job);
  1063. if (0 != ret) {
  1064. goto error;
  1065. }
  1066. job->current_pass = 1;
  1067. ret = import_sweep_after_pass(job);
  1068. if (0 != ret) {
  1069. goto error;
  1070. }
  1071. }
  1072. /* Fixup the first_ID value to reflect previous work */
  1073. job->first_ID = job->ready_ID + 1;
  1074. import_free_thread_data(job);
  1075. job->worker_list = producer;
  1076. import_log_notice(job, "Beginning pass number %d",
  1077. job->total_pass+1);
  1078. } else {
  1079. /* Bizarro-slapd */
  1080. goto error;
  1081. }
  1082. }
  1083. }
  1084. /* kill the producer now; we're done */
  1085. if (producer) {
  1086. import_log_notice(job, "Cleaning up producer thread...");
  1087. producer->command = STOP;
  1088. /* wait for the lead thread to stop */
  1089. while (producer->state != FINISHED) {
  1090. DS_Sleep(PR_MillisecondsToInterval(100));
  1091. }
  1092. }
  1093. /* Now do the numsubordinates attribute */
  1094. import_log_notice(job, "Indexing complete. Post-processing...");
  1095. /* [610066] reindexed db cannot be used in the following backup/restore */
  1096. if ( !(job->flags & FLAG_REINDEXING) &&
  1097. (ret = update_subordinatecounts(be, job->mothers, NULL)) != 0) {
  1098. import_log_notice(job, "Failed to update numsubordinates attributes");
  1099. goto error;
  1100. }
  1101. /* And the ancestorid index */
  1102. if ((ret = ldbm_ancestorid_create_index(be)) != 0) {
  1103. import_log_notice(job, "Failed to create ancestorid index");
  1104. goto error;
  1105. }
  1106. import_log_notice(job, "Flushing caches...");
  1107. if (0 != (ret = dblayer_flush(job->inst->inst_li)) ) {
  1108. import_log_notice(job, "Failed to flush database");
  1109. goto error;
  1110. }
  1111. /* New way to exit the routine: check the return code.
  1112. * If it's non-zero, delete the database files.
  1113. * Otherwise don't, but always close the database layer properly.
  1114. * Then return. This ensures that we can't make a half-good/half-bad
  1115. * Database. */
  1116. error:
  1117. /* If we fail, the database is now in a mess, so we delete it */
  1118. import_log_notice(job, "Closing files...");
  1119. cache_clear(&job->inst->inst_cache);
  1120. if (0 != ret) {
  1121. dblayer_delete_instance_dir(be);
  1122. dblayer_instance_close(job->inst->inst_be);
  1123. } else {
  1124. if (0 != (ret = dblayer_instance_close(job->inst->inst_be)) ) {
  1125. import_log_notice(job, "Failed to close database");
  1126. }
  1127. }
  1128. if (!(job->flags & FLAG_ONLINE))
  1129. dblayer_close(job->inst->inst_li, DBLAYER_IMPORT_MODE);
  1130. time(&end);
  1131. if (verbose && (0 == ret)) {
  1132. int seconds_to_import = end - beginning;
  1133. size_t entries_processed = job->lead_ID - (job->starting_ID - 1);
  1134. double entries_per_second = (double) entries_processed /
  1135. (double) seconds_to_import;
  1136. if (job->not_here_skipped)
  1137. {
  1138. if (job->skipped)
  1139. import_log_notice(job, "Import complete. Processed %lu entries "
  1140. "(%d bad entries were skipped, "
  1141. "%d entries were skipped because they don't "
  1142. "belong to this database) in %d seconds. "
  1143. "(%.2f entries/sec)", entries_processed,
  1144. job->skipped, job->not_here_skipped,
  1145. seconds_to_import, entries_per_second);
  1146. else
  1147. import_log_notice(job, "Import complete. Processed %lu entries "
  1148. "(%d entries were skipped because they don't "
  1149. "belong to this database) "
  1150. "in %d seconds. (%.2f entries/sec)",
  1151. entries_processed, job->not_here_skipped,
  1152. seconds_to_import, entries_per_second);
  1153. }
  1154. else
  1155. {
  1156. if (job->skipped)
  1157. import_log_notice(job, "Import complete. Processed %lu entries "
  1158. "(%d were skipped) in %d seconds. "
  1159. "(%.2f entries/sec)", entries_processed,
  1160. job->skipped, seconds_to_import,
  1161. entries_per_second);
  1162. else
  1163. import_log_notice(job, "Import complete. Processed %lu entries "
  1164. "in %d seconds. (%.2f entries/sec)",
  1165. entries_processed, seconds_to_import,
  1166. entries_per_second);
  1167. }
  1168. }
  1169. if (0 != ret) {
  1170. import_log_notice(job, "Import failed.");
  1171. if (job->task != NULL) {
  1172. slapi_task_finish(job->task, ret);
  1173. }
  1174. } else {
  1175. if (job->task)
  1176. slapi_task_dec_refcount(job->task);
  1177. import_all_done(job, ret);
  1178. }
  1179. /* This instance isn't busy anymore */
  1180. instance_set_not_busy(job->inst);
  1181. import_free_job(job);
  1182. if (producer)
  1183. FREE(producer);
  1184. return(ret);
  1185. }
  1186. /*
  1187. * to be called by online import using PR_CreateThread()
  1188. * offline import directly calls import_main_offline()
  1189. *
  1190. */
  1191. void import_main(void *arg)
  1192. {
  1193. import_main_offline(arg);
  1194. }
  1195. int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb)
  1196. {
  1197. backend *be = NULL;
  1198. int noattrindexes = 0;
  1199. ImportJob *job = NULL;
  1200. char **name_array = NULL;
  1201. int total_files, i;
  1202. PRThread *thread = NULL;
  1203. job = CALLOC(ImportJob);
  1204. if (job == NULL) {
  1205. LDAPDebug(LDAP_DEBUG_ANY, "not enough memory to do import job\n",
  1206. 0, 0, 0);
  1207. return -1;
  1208. }
  1209. slapi_pblock_get( pb, SLAPI_BACKEND, &be);
  1210. PR_ASSERT(NULL != be);
  1211. job->inst = (ldbm_instance *)be->be_instance_info;
  1212. slapi_pblock_get( pb, SLAPI_LDIF2DB_NOATTRINDEXES, &noattrindexes );
  1213. slapi_pblock_get( pb, SLAPI_LDIF2DB_FILE, &name_array );
  1214. /* the removedupvals field is blatantly overloaded here to mean
  1215. * the chunk size too. (chunk size = number of entries that should
  1216. * be imported before starting a new pass. usually for debugging.)
  1217. */
  1218. slapi_pblock_get(pb, SLAPI_LDIF2DB_REMOVEDUPVALS, &job->merge_chunk_size);
  1219. if (job->merge_chunk_size == 1)
  1220. job->merge_chunk_size = 0;
  1221. /* get list of specifically included and/or excluded subtrees from
  1222. * the front-end */
  1223. ldbm_back_fetch_incl_excl(pb, &job->include_subtrees,
  1224. &job->exclude_subtrees);
  1225. /* get cn=tasks info, if any */
  1226. slapi_pblock_get(pb, SLAPI_BACKEND_TASK, &job->task);
  1227. slapi_pblock_get(pb, SLAPI_LDIF2DB_ENCRYPT, &job->encrypt);
  1228. /* get uniqueid info */
  1229. slapi_pblock_get(pb, SLAPI_LDIF2DB_GENERATE_UNIQUEID, &job->uuid_gen_type);
  1230. if (job->uuid_gen_type == SLAPI_UNIQUEID_GENERATE_NAME_BASED) {
  1231. char *namespaceid;
  1232. slapi_pblock_get(pb, SLAPI_LDIF2DB_NAMESPACEID, &namespaceid);
  1233. job->uuid_namespace = slapi_ch_strdup(namespaceid);
  1234. }
  1235. job->flags = FLAG_USE_FILES;
  1236. if (NULL == name_array) /* no ldif file is given -> reindexing */
  1237. job->flags |= FLAG_REINDEXING;
  1238. if (!noattrindexes)
  1239. job->flags |= FLAG_INDEX_ATTRS;
  1240. for (i = 0; name_array && name_array[i] != NULL; i++)
  1241. charray_add(&job->input_filenames, slapi_ch_strdup(name_array[i]));
  1242. job->starting_ID = 1;
  1243. job->first_ID = 1;
  1244. job->mothers = CALLOC(import_subcount_stuff);
  1245. /* how much space should we allocate to index buffering? */
  1246. job->job_index_buffer_size = import_get_index_buffer_size();
  1247. if (job->job_index_buffer_size == 0) {
  1248. /* 10% of the allocated cache size + one meg */
  1249. PR_Lock(job->inst->inst_li->li_config_mutex);
  1250. job->job_index_buffer_size = (job->inst->inst_li->li_import_cachesize/10) +
  1251. (1024*1024);
  1252. PR_Unlock(job->inst->inst_li->li_config_mutex);
  1253. }
  1254. import_subcount_stuff_init(job->mothers);
  1255. if (job->task != NULL) {
  1256. /* count files, use that to track "progress" in cn=tasks */
  1257. total_files = 0;
  1258. while (name_array && name_array[total_files] != NULL)
  1259. total_files++;
  1260. /* add 1 to account for post-import cleanup (which can take a
  1261. * significant amount of time)
  1262. */
  1263. /* NGK - This should eventually be cleaned up to use the public
  1264. * task API. */
  1265. if (0 == total_files) /* reindexing */
  1266. job->task->task_work = 2;
  1267. else
  1268. job->task->task_work = total_files + 1;
  1269. job->task->task_progress = 0;
  1270. job->task->task_state = SLAPI_TASK_RUNNING;
  1271. slapi_task_set_data(job->task, job);
  1272. slapi_task_set_destructor_fn(job->task, import_task_destroy);
  1273. slapi_task_set_cancel_fn(job->task, import_task_abort);
  1274. job->flags |= FLAG_ONLINE;
  1275. /* create thread for import_main, so we can return */
  1276. thread = PR_CreateThread(PR_USER_THREAD, import_main, (void *)job,
  1277. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  1278. PR_UNJOINABLE_THREAD,
  1279. SLAPD_DEFAULT_THREAD_STACKSIZE);
  1280. if (thread == NULL) {
  1281. PRErrorCode prerr = PR_GetError();
  1282. LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import thread, "
  1283. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  1284. prerr, slapd_pr_strerror(prerr), 0);
  1285. import_free_job(job);
  1286. FREE(job);
  1287. return -2;
  1288. }
  1289. return 0;
  1290. }
  1291. /* old style -- do it all synchronously (THIS IS GOING AWAY SOON) */
  1292. return import_main_offline((void *)job);
  1293. }