import.c 61 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720
  1. /** BEGIN COPYRIGHT BLOCK
  2. * This Program is free software; you can redistribute it and/or modify it under
  3. * the terms of the GNU General Public License as published by the Free Software
  4. * Foundation; version 2 of the License.
  5. *
  6. * This Program is distributed in the hope that it will be useful, but WITHOUT
  7. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  8. * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  9. *
  10. * You should have received a copy of the GNU General Public License along with
  11. * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
  12. * Place, Suite 330, Boston, MA 02111-1307 USA.
  13. *
  14. * In addition, as a special exception, Red Hat, Inc. gives You the additional
  15. * right to link the code of this Program with code not covered under the GNU
  16. * General Public License ("Non-GPL Code") and to distribute linked combinations
  17. * including the two, subject to the limitations in this paragraph. Non-GPL Code
  18. * permitted under this exception must only link to the code of this Program
  19. * through those well defined interfaces identified in the file named EXCEPTION
  20. * found in the source code files (the "Approved Interfaces"). The files of
  21. * Non-GPL Code may instantiate templates or use macros or inline functions from
  22. * the Approved Interfaces without causing the resulting work to be covered by
  23. * the GNU General Public License. Only Red Hat, Inc. may make changes or
  24. * additions to the list of Approved Interfaces. You must obey the GNU General
  25. * Public License in all respects for all of the Program code and other code used
  26. * in conjunction with the Program except the Non-GPL Code covered by this
  27. * exception. If you modify this file, you may extend this exception to your
  28. * version of the file, but you are not obligated to do so. If you do not wish to
  29. * provide this exception without modification, you must delete this exception
  30. * statement from your version and license this file solely under the GPL without
  31. * exception.
  32. *
  33. *
  34. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  35. * Copyright (C) 2005 Red Hat, Inc.
  36. * All rights reserved.
  37. * END COPYRIGHT BLOCK **/
  38. #ifdef HAVE_CONFIG_H
  39. # include <config.h>
  40. #endif
  41. /*
  42. * the "new" ("deluxe") backend import code
  43. *
  44. * please make sure you use 4-space indentation on this file.
  45. */
  46. #include "back-ldbm.h"
  47. #include "vlv_srch.h"
  48. #include "import.h"
  49. #define ERR_IMPORT_ABORTED -23
  50. #define NEED_DN_NORM -24
  51. #define NEED_DN_NORM_SP -25
  52. #define NEED_DN_NORM_BT -26
  53. /********** routines to manipulate the entry fifo **********/
  54. /* this is pretty bogus -- could be a HUGE amount of memory */
  55. /* Not anymore with the Import Queue Adaptative Algorithm (Regulation) */
  56. #define MAX_FIFO_SIZE 8000
  57. static int import_fifo_init(ImportJob *job)
  58. {
  59. ldbm_instance *inst = job->inst;
  60. /* Work out how big the entry fifo can be */
  61. if (inst->inst_cache.c_maxentries > 0)
  62. job->fifo.size = inst->inst_cache.c_maxentries;
  63. else
  64. job->fifo.size = inst->inst_cache.c_maxsize / 1024; /* guess */
  65. /* byte limit that should be respected to avoid memory starvation */
  66. /* conservative computing: multiply by .8 to allow for reasonable overflow */
  67. job->fifo.bsize = (inst->inst_cache.c_maxsize/10) << 3;
  68. job->fifo.c_bsize = 0;
  69. if (job->fifo.size > MAX_FIFO_SIZE)
  70. job->fifo.size = MAX_FIFO_SIZE;
  71. /* has to be at least 1 or 2, and anything less than about 100 destroys
  72. * the point of doing all this optimization in the first place. */
  73. if (job->fifo.size < 100)
  74. job->fifo.size = 100;
  75. /* Get memory for the entry fifo */
  76. /* This is used to keep a ref'ed pointer to the last <cachesize>
  77. * processed entries */
  78. PR_ASSERT(NULL == job->fifo.item);
  79. job->fifo.item = (FifoItem *)slapi_ch_calloc(job->fifo.size,
  80. sizeof(FifoItem));
  81. if (NULL == job->fifo.item) {
  82. /* Memory allocation error */
  83. return -1;
  84. }
  85. return 0;
  86. }
  87. FifoItem *import_fifo_fetch(ImportJob *job, ID id, int worker)
  88. {
  89. int idx = id % job->fifo.size;
  90. FifoItem *fi;
  91. if (job->fifo.item) {
  92. fi = &(job->fifo.item[idx]);
  93. } else {
  94. return NULL;
  95. }
  96. if (fi->entry) {
  97. if (worker) {
  98. if (fi->bad) {
  99. if (fi->bad == FIFOITEM_BAD) {
  100. fi->bad = FIFOITEM_BAD_PRINTED;
  101. if (!(job->flags & FLAG_UPGRADEDNFORMAT_V1)) {
  102. import_log_notice(job, "WARNING: bad entry: ID %d", id);
  103. }
  104. }
  105. return NULL;
  106. }
  107. PR_ASSERT(fi->entry->ep_refcnt > 0);
  108. }
  109. }
  110. return fi;
  111. }
  112. static void import_fifo_destroy(ImportJob *job)
  113. {
  114. /* Free any entries in the fifo first */
  115. struct backentry *be = NULL;
  116. size_t i = 0;
  117. for (i = 0; i < job->fifo.size; i++) {
  118. be = job->fifo.item[i].entry;
  119. backentry_free(&be);
  120. job->fifo.item[i].entry = NULL;
  121. job->fifo.item[i].filename = NULL;
  122. }
  123. slapi_ch_free((void **)&job->fifo.item);
  124. job->fifo.item = NULL;
  125. }
  126. /********** logging stuff **********/
  127. #define LOG_BUFFER 512
  128. /* this changes the 'nsTaskStatus' value, which is transient (anything logged
  129. * here wipes out any previous status)
  130. */
  131. static void import_log_status_start(ImportJob *job)
  132. {
  133. if (! job->task_status)
  134. job->task_status = (char *)slapi_ch_malloc(10 * LOG_BUFFER);
  135. if (! job->task_status)
  136. return; /* out of memory? */
  137. job->task_status[0] = 0;
  138. }
  139. static void import_log_status_add_line(ImportJob *job, char *format, ...)
  140. {
  141. va_list ap;
  142. int len = 0;
  143. if (! job->task_status)
  144. return;
  145. len = strlen(job->task_status);
  146. if (len + 5 > (10 * LOG_BUFFER))
  147. return; /* no room */
  148. if (job->task_status[0])
  149. strcat(job->task_status, "\n");
  150. va_start(ap, format);
  151. PR_vsnprintf(job->task_status + len, (10 * LOG_BUFFER) - len, format, ap);
  152. va_end(ap);
  153. }
  154. static void import_log_status_done(ImportJob *job)
  155. {
  156. if (job->task) {
  157. slapi_task_log_status(job->task, "%s", job->task_status);
  158. }
  159. }
  160. /* this adds a line to the 'nsTaskLog' value, which is cumulative (anything
  161. * logged here is added to the end)
  162. */
  163. void import_log_notice(ImportJob *job, char *format, ...)
  164. {
  165. va_list ap;
  166. char buffer[LOG_BUFFER];
  167. va_start(ap, format);
  168. PR_vsnprintf(buffer, LOG_BUFFER, format, ap);
  169. va_end(ap);
  170. if (job->task) {
  171. slapi_task_log_notice(job->task, "%s", buffer);
  172. }
  173. /* also save it in the logs for posterity */
  174. if (job->flags & (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1)) {
  175. LDAPDebug(LDAP_DEBUG_ANY, "upgradedn %s: %s\n", job->inst->inst_name,
  176. buffer, 0);
  177. } else if (job->flags & FLAG_REINDEXING) {
  178. LDAPDebug(LDAP_DEBUG_ANY, "reindex %s: %s\n", job->inst->inst_name,
  179. buffer, 0);
  180. } else {
  181. LDAPDebug(LDAP_DEBUG_ANY, "import %s: %s\n", job->inst->inst_name,
  182. buffer, 0);
  183. }
  184. }
  185. static void import_task_destroy(Slapi_Task *task)
  186. {
  187. ImportJob *job = (ImportJob *)slapi_task_get_data(task);
  188. if (job && job->task_status) {
  189. slapi_ch_free((void **)&job->task_status);
  190. job->task_status = NULL;
  191. }
  192. FREE(job);
  193. slapi_task_set_data(task, NULL);
  194. }
  195. static void import_task_abort(Slapi_Task *task)
  196. {
  197. ImportJob *job;
  198. /* don't log anything from here, because we're still holding the
  199. * DSE lock for modify...
  200. */
  201. if (slapi_task_get_state(task) == SLAPI_TASK_FINISHED) {
  202. /* too late */
  203. }
  204. /*
  205. * Race condition.
  206. * If the import thread happens to finish right now we're in trouble
  207. * because it will free the job.
  208. */
  209. job = (ImportJob *)slapi_task_get_data(task);
  210. import_abort_all(job, 0);
  211. while (slapi_task_get_state(task) != SLAPI_TASK_FINISHED)
  212. DS_Sleep(PR_MillisecondsToInterval(100));
  213. }
  214. /********** helper functions for importing **********/
  215. /* Function used to gather a list of indexed attrs */
  216. static int import_attr_callback(void *node, void *param)
  217. {
  218. ImportJob *job = (ImportJob *)param;
  219. struct attrinfo *a = (struct attrinfo *)node;
  220. if (job->flags & FLAG_DRYRUN) { /* dryrun; we don't need the workers */
  221. return 0;
  222. }
  223. if (job->flags & (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1)) {
  224. /* Bring up import workers just for indexes having DN syntax
  225. * attribute type. (except entrydn -- taken care below) */
  226. int rc = 0;
  227. Slapi_Attr attr = {0};
  228. /*
  229. * Treat cn and ou specially. Bring up the import workers for
  230. * cn and ou even though they are not DN syntax attribute.
  231. * This is done because they have some exceptional case to store
  232. * DN format in the admin entries such as UserPreferences.
  233. */
  234. if ((0 == PL_strcasecmp("cn", a->ai_type)) ||
  235. (0 == PL_strcasecmp("commonname", a->ai_type)) ||
  236. (0 == PL_strcasecmp("ou", a->ai_type)) ||
  237. (0 == PL_strcasecmp("organizationalUnit", a->ai_type))) {
  238. ;
  239. } else {
  240. slapi_attr_init(&attr, a->ai_type);
  241. rc = slapi_attr_is_dn_syntax_attr(&attr);
  242. attr_done(&attr);
  243. if (0 == rc) {
  244. return 0;
  245. }
  246. }
  247. }
  248. /* OK, so we now have hold of the attribute structure and the job info,
  249. * let's see what we have. Remember that although this function is called
  250. * many times, all these calls are in the context of a single thread, so we
  251. * don't need to worry about protecting the data in the job structure.
  252. */
  253. /* We need to specifically exclude the (entrydn, entryrdn) & parentid &
  254. * ancestorid indexes because we build those in the foreman thread.
  255. */
  256. if (IS_INDEXED(a->ai_indexmask) &&
  257. (strcasecmp(a->ai_type, LDBM_ENTRYDN_STR) != 0) &&
  258. (strcasecmp(a->ai_type, LDBM_ENTRYRDN_STR) != 0) &&
  259. (strcasecmp(a->ai_type, LDBM_PARENTID_STR) != 0) &&
  260. (strcasecmp(a->ai_type, LDBM_ANCESTORID_STR) != 0) &&
  261. (strcasecmp(a->ai_type, numsubordinates) != 0)) {
  262. /* Make an import_index_info structure, fill it in and insert into the
  263. * job's list */
  264. IndexInfo *info = CALLOC(IndexInfo);
  265. if (NULL == info) {
  266. /* Memory allocation error */
  267. return -1;
  268. }
  269. info->name = slapi_ch_strdup(a->ai_type);
  270. info->ai = a;
  271. if (NULL == info->name) {
  272. /* Memory allocation error */
  273. FREE(info);
  274. return -1;
  275. }
  276. info->next = job->index_list;
  277. job->index_list = info;
  278. job->number_indexers++;
  279. }
  280. return 0;
  281. }
  282. static void import_set_index_buffer_size(ImportJob *job)
  283. {
  284. IndexInfo *current_index = NULL;
  285. size_t substring_index_count = 0;
  286. size_t proposed_size = 0;
  287. /* Count the substring indexes we have */
  288. for (current_index = job->index_list; current_index != NULL;
  289. current_index = current_index->next) {
  290. if (current_index->ai->ai_indexmask & INDEX_SUB) {
  291. substring_index_count++;
  292. }
  293. }
  294. if (substring_index_count > 0) {
  295. /* Make proposed size such that if all substring indices were
  296. * reasonably full, we'd hit the target space */
  297. proposed_size = (job->job_index_buffer_size / substring_index_count) /
  298. IMPORT_INDEX_BUFFER_SIZE_CONSTANT;
  299. if (proposed_size > IMPORT_MAX_INDEX_BUFFER_SIZE) {
  300. proposed_size = IMPORT_MAX_INDEX_BUFFER_SIZE;
  301. }
  302. if (proposed_size < IMPORT_MIN_INDEX_BUFFER_SIZE) {
  303. proposed_size = 0;
  304. }
  305. }
  306. job->job_index_buffer_suggestion = proposed_size;
  307. }
  308. static void import_free_thread_data(ImportJob *job)
  309. {
  310. /* DBDB free the lists etc */
  311. ImportWorkerInfo *worker = job->worker_list;
  312. while (worker != NULL) {
  313. ImportWorkerInfo *asabird = worker;
  314. worker = worker->next;
  315. if (asabird->work_type != PRODUCER)
  316. slapi_ch_free( (void**)&asabird);
  317. }
  318. }
  319. void import_free_job(ImportJob *job)
  320. {
  321. /* DBDB free the lists etc */
  322. IndexInfo *index = job->index_list;
  323. import_free_thread_data(job);
  324. while (index != NULL) {
  325. IndexInfo *asabird = index;
  326. index = index->next;
  327. slapi_ch_free( (void**)&asabird->name);
  328. slapi_ch_free( (void**)&asabird);
  329. }
  330. job->index_list = NULL;
  331. if (NULL != job->mothers) {
  332. import_subcount_stuff_term(job->mothers);
  333. slapi_ch_free( (void**)&job->mothers);
  334. }
  335. ldbm_back_free_incl_excl(job->include_subtrees, job->exclude_subtrees);
  336. charray_free(job->input_filenames);
  337. if (job->fifo.size)
  338. import_fifo_destroy(job);
  339. if (NULL != job->uuid_namespace)
  340. slapi_ch_free((void **)&job->uuid_namespace);
  341. if (job->wire_lock)
  342. PR_DestroyLock(job->wire_lock);
  343. if (job->wire_cv)
  344. PR_DestroyCondVar(job->wire_cv);
  345. slapi_ch_free((void **)&job->task_status);
  346. }
  347. /* determine if we are the correct backend for this entry
  348. * (in a distributed suffix, some entries may be for other backends).
  349. * if the entry's dn actually matches one of the suffixes of the be, we
  350. * automatically take it as a belonging one, for such entries must be
  351. * present in EVERY backend independently of the distribution applied.
  352. */
  353. int import_entry_belongs_here(Slapi_Entry *e, backend *be)
  354. {
  355. Slapi_Backend *retbe;
  356. Slapi_DN *sdn = slapi_entry_get_sdn(e);
  357. if (slapi_be_issuffix(be, sdn))
  358. return 1;
  359. retbe = slapi_mapping_tree_find_backend_for_sdn(sdn);
  360. return (retbe == be);
  361. }
  362. /********** starting threads and stuff **********/
  363. /* Solaris is weird---we need an LWP per thread but NSPR doesn't give us
  364. * one unless we make this magic belshe-call */
  365. /* Fixed on Solaris 8; NSPR supports PR_GLOBAL_BOUND_THREAD */
  366. #define CREATE_THREAD PR_CreateThread
  367. static void import_init_worker_info(ImportWorkerInfo *info, ImportJob *job)
  368. {
  369. info->command = PAUSE;
  370. info->job = job;
  371. info->first_ID = job->first_ID;
  372. info->index_buffer_size = job->job_index_buffer_suggestion;
  373. }
  374. static int import_start_threads(ImportJob *job)
  375. {
  376. IndexInfo *current_index = NULL;
  377. ImportWorkerInfo *foreman = NULL, *worker = NULL;
  378. foreman = CALLOC(ImportWorkerInfo);
  379. if (!foreman)
  380. goto error;
  381. /* start the foreman */
  382. import_init_worker_info(foreman, job);
  383. foreman->work_type = FOREMAN;
  384. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_foreman, foreman,
  385. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  386. PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  387. PRErrorCode prerr = PR_GetError();
  388. LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import foreman thread, "
  389. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  390. prerr, slapd_pr_strerror(prerr), 0);
  391. FREE(foreman);
  392. goto error;
  393. }
  394. foreman->next = job->worker_list;
  395. job->worker_list = foreman;
  396. /* Start follower threads, if we are doing attribute indexing */
  397. current_index = job->index_list;
  398. if (job->flags & FLAG_INDEX_ATTRS) {
  399. while (current_index) {
  400. /* make a new thread info structure */
  401. worker = CALLOC(ImportWorkerInfo);
  402. if (! worker)
  403. goto error;
  404. /* fill it in */
  405. import_init_worker_info(worker, job);
  406. worker->index_info = current_index;
  407. worker->work_type = WORKER;
  408. /* Start the thread */
  409. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_worker, worker,
  410. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  411. PR_UNJOINABLE_THREAD,
  412. SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  413. PRErrorCode prerr = PR_GetError();
  414. LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import worker thread, "
  415. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  416. prerr, slapd_pr_strerror(prerr), 0);
  417. FREE(worker);
  418. goto error;
  419. }
  420. /* link it onto the job's thread list */
  421. worker->next = job->worker_list;
  422. job->worker_list = worker;
  423. current_index = current_index->next;
  424. }
  425. }
  426. return 0;
  427. error:
  428. import_log_notice(job, "Import thread creation failed.");
  429. import_log_notice(job, "Aborting all import threads...");
  430. import_abort_all(job, 1);
  431. import_log_notice(job, "Import threads aborted.");
  432. return -1;
  433. }
  434. /********** monitoring the worker threads **********/
  435. static void import_clear_progress_history(ImportJob *job)
  436. {
  437. int i = 0;
  438. for (i = 0; i < IMPORT_JOB_PROG_HISTORY_SIZE /*- 1*/; i++) {
  439. job->progress_history[i] = job->first_ID;
  440. job->progress_times[i] = job->start_time;
  441. }
  442. /* reset libdb cache stats */
  443. job->inst->inst_cache_hits = job->inst->inst_cache_misses = 0;
  444. }
  445. static double import_grok_db_stats(ldbm_instance *inst)
  446. {
  447. DB_MPOOL_STAT *mpstat = NULL;
  448. DB_MPOOL_FSTAT **mpfstat = NULL;
  449. int return_value = -1;
  450. double cache_hit_ratio = 0.0;
  451. return_value = dblayer_memp_stat_instance(inst, &mpstat, &mpfstat);
  452. if (!mpstat) {
  453. goto out;
  454. }
  455. if (0 == return_value) {
  456. unsigned long current_cache_hits = mpstat->st_cache_hit;
  457. unsigned long current_cache_misses = mpstat->st_cache_miss;
  458. if (inst->inst_cache_hits) {
  459. unsigned long hit_delta, miss_delta;
  460. hit_delta = current_cache_hits - inst->inst_cache_hits;
  461. miss_delta = current_cache_misses - inst->inst_cache_misses;
  462. if (hit_delta != 0) {
  463. cache_hit_ratio = (double)hit_delta /
  464. (double)(hit_delta + miss_delta);
  465. }
  466. }
  467. inst->inst_cache_misses = current_cache_misses;
  468. inst->inst_cache_hits = current_cache_hits;
  469. }
  470. out:
  471. if (mpstat)
  472. slapi_ch_free((void **)&mpstat);
  473. if (mpfstat) {
  474. #if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR + DB_VERSION_PATCH <= 3204
  475. /* In DB 3.2.4 and earlier, we need to free each element */
  476. DB_MPOOL_FSTAT **tfsp;
  477. for (tfsp = mpfstat; *tfsp; tfsp++)
  478. slapi_ch_free((void **)tfsp);
  479. #endif
  480. slapi_ch_free((void **)&mpfstat);
  481. }
  482. return cache_hit_ratio;
  483. }
  484. static char* import_decode_worker_state(int state)
  485. {
  486. switch (state) {
  487. case WAITING:
  488. return "W";
  489. case RUNNING:
  490. return "R";
  491. case FINISHED:
  492. return "F";
  493. case ABORTED:
  494. return "A";
  495. default:
  496. return "?";
  497. }
  498. }
  499. static void import_print_worker_status(ImportWorkerInfo *info)
  500. {
  501. char *name = (info->work_type == PRODUCER ? "Producer" :
  502. (info->work_type == FOREMAN ? "Foreman" :
  503. info->index_info->name));
  504. import_log_status_add_line(info->job,
  505. "%-25s %s%10ld %7.1f", name,
  506. import_decode_worker_state(info->state),
  507. info->last_ID_processed, info->rate);
  508. }
  509. #define IMPORT_CHUNK_TEST_HOLDOFF_TIME (5*60) /* Seconds */
  510. /* Got to be lower than this: */
  511. #define IMPORT_CHUNK_TEST_CACHE_HIT_RATIO (0.99)
  512. /* Less than half as fast as we were doing: */
  513. #define IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A (0.5)
  514. /* A lot less fast than we were doing: */
  515. #define IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B (0.1)
  516. static int import_throw_in_towel(ImportJob *job, time_t current_time,
  517. ID trailing_ID)
  518. {
  519. static int number_of_times_here = 0;
  520. /* secret -c option allows specific chunk size to be set... */
  521. if (job->merge_chunk_size != 0) {
  522. if ((0 != job->lead_ID) &&
  523. (trailing_ID > job->first_ID) &&
  524. (trailing_ID - job->first_ID > job->merge_chunk_size)) {
  525. return 1;
  526. }
  527. return 0;
  528. }
  529. /* Check stats to decide whether we're getting bogged down and should
  530. * terminate this pass.
  531. */
  532. /* Check #1 : are we more than 10 minutes into the chunk ? */
  533. if (current_time - job->start_time > IMPORT_CHUNK_TEST_HOLDOFF_TIME) {
  534. /* Check #2 : Have we slowed down considerably recently ? */
  535. if ((job->recent_progress_rate / job->average_progress_rate) <
  536. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A) {
  537. /* Check #3: Cache performing poorly---the puported reason
  538. * for the slowdown */
  539. if (job->cache_hit_ratio < IMPORT_CHUNK_TEST_CACHE_HIT_RATIO) {
  540. /* We have a winner ! */
  541. import_log_notice(job, "Decided to end this pass because "
  542. "the progress rate has dropped below "
  543. "the %.0f%% threshold.",
  544. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A*100.0);
  545. return 1;
  546. }
  547. } else {
  548. if ((job->recent_progress_rate / job->average_progress_rate) <
  549. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B) {
  550. /* Alternative check: have we really, really slowed down,
  551. * without the test for cache overflow? */
  552. /* This is designed to catch the case where the cache has
  553. * been misconfigured too large */
  554. if (number_of_times_here > 10) {
  555. /* Got to get here ten times at least */
  556. import_log_notice(job, "Decided to end this pass "
  557. "because the progress rate "
  558. "plummeted below %.0f%%",
  559. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B*100.0);
  560. return 1;
  561. }
  562. number_of_times_here++;
  563. }
  564. }
  565. }
  566. number_of_times_here = 0;
  567. return 0;
  568. }
  569. static void import_push_progress_history(ImportJob *job, ID current_id,
  570. time_t current_time)
  571. {
  572. int i = 0;
  573. for (i = 0; i < IMPORT_JOB_PROG_HISTORY_SIZE - 1; i++) {
  574. job->progress_history[i] = job->progress_history[i+1];
  575. job->progress_times[i] = job->progress_times[i+1];
  576. }
  577. job->progress_history[i] = current_id;
  578. job->progress_times[i] = current_time;
  579. }
  580. static void import_calc_rate(ImportWorkerInfo *info, int time_interval)
  581. {
  582. size_t ids = info->last_ID_processed - info->previous_ID_counted;
  583. double rate = (double)ids / time_interval;
  584. if ( (info->previous_ID_counted != 0) && (info->last_ID_processed != 0) ) {
  585. info->rate = rate;
  586. } else {
  587. info->rate = 0;
  588. }
  589. info->previous_ID_counted = info->last_ID_processed;
  590. }
  591. /* find the rate (ids/time) of work from a worker thread between history
  592. * marks A and B.
  593. */
  594. #define HISTORY(N) (job->progress_history[N])
  595. #define TIMES(N) (job->progress_times[N])
  596. #define PROGRESS(A, B) ((HISTORY(B) > HISTORY(A)) ? \
  597. ((double)(HISTORY(B) - HISTORY(A)) / \
  598. (double)(TIMES(B) - TIMES(A))) : \
  599. (double)0)
  600. static int import_monitor_threads(ImportJob *job, int *status)
  601. {
  602. PRIntervalTime tenthsecond = PR_MillisecondsToInterval(100);
  603. ImportWorkerInfo *current_worker = NULL;
  604. ImportWorkerInfo *producer = NULL, *foreman = NULL;
  605. int finished = 0;
  606. int giveup = 0;
  607. int count = 1; /* 1 to prevent premature status report */
  608. int producer_done = 0;
  609. const int display_interval = 200;
  610. time_t time_now = 0;
  611. time_t last_time = 0;
  612. time_t time_interval = 0;
  613. int rc = 0;
  614. int corestate = 0;
  615. for (current_worker = job->worker_list; current_worker != NULL;
  616. current_worker = current_worker->next) {
  617. current_worker->command = RUN;
  618. if (current_worker->work_type == PRODUCER)
  619. producer = current_worker;
  620. if (current_worker->work_type == FOREMAN)
  621. foreman = current_worker;
  622. }
  623. if (job->flags & FLAG_USE_FILES)
  624. PR_ASSERT(producer != NULL);
  625. PR_ASSERT(foreman != NULL);
  626. if (!foreman) {
  627. goto error_abort;
  628. }
  629. time(&last_time);
  630. job->start_time = last_time;
  631. import_clear_progress_history(job);
  632. while (!finished) {
  633. ID trailing_ID = NOID;
  634. DS_Sleep(tenthsecond);
  635. finished = 1;
  636. /* First calculate the time interval since last reported */
  637. if (0 == (count % display_interval)) {
  638. time(&time_now);
  639. time_interval = time_now - last_time;
  640. last_time = time_now;
  641. /* Now calculate our rate of progress overall for this chunk */
  642. if (time_now != job->start_time) {
  643. /* log a cute chart of the worker progress */
  644. import_log_status_start(job);
  645. import_log_status_add_line(job,
  646. "Index status for import of %s:", job->inst->inst_name);
  647. import_log_status_add_line(job,
  648. "-------Index Task-------State---Entry----Rate-");
  649. import_push_progress_history(job, foreman->last_ID_processed,
  650. time_now);
  651. job->average_progress_rate =
  652. (double)(HISTORY(IMPORT_JOB_PROG_HISTORY_SIZE-1)+1 - foreman->first_ID) /
  653. (double)(TIMES(IMPORT_JOB_PROG_HISTORY_SIZE-1) - job->start_time);
  654. job->recent_progress_rate =
  655. PROGRESS(0, IMPORT_JOB_PROG_HISTORY_SIZE-1);
  656. job->cache_hit_ratio = import_grok_db_stats(job->inst);
  657. }
  658. }
  659. for (current_worker = job->worker_list; current_worker != NULL;
  660. current_worker = current_worker->next) {
  661. /* Calculate the ID at which the slowest worker is currently
  662. * processing */
  663. if ((trailing_ID > current_worker->last_ID_processed) &&
  664. (current_worker->work_type == WORKER)) {
  665. trailing_ID = current_worker->last_ID_processed;
  666. }
  667. if (0 == (count % display_interval) && time_interval) {
  668. import_calc_rate(current_worker, time_interval);
  669. import_print_worker_status(current_worker);
  670. }
  671. corestate = current_worker->state & CORESTATE;
  672. if (current_worker->state == ABORTED) {
  673. goto error_abort;
  674. } else if ((corestate == QUIT) || (corestate == FINISHED)) {
  675. if (DN_NORM_BT == (DN_NORM_BT & current_worker->state)) {
  676. /* upgrading dn norm (both) is needed */
  677. rc = NEED_DN_NORM_BT; /* Set the RC; Don't abort now;
  678. * We have to stop other
  679. * threads */
  680. } else if (DN_NORM == (DN_NORM_BT & current_worker->state)) {
  681. /* upgrading dn norm is needed */
  682. rc = NEED_DN_NORM; /* Set the RC; Don't abort now;
  683. * We have to stop other threads
  684. */
  685. } else if (DN_NORM_SP == (DN_NORM_BT & current_worker->state)) {
  686. /* upgrading spaces in dn norm is needed */
  687. rc = NEED_DN_NORM_SP; /* Set the RC; Don't abort now;
  688. * We have to stop other
  689. * threads */
  690. }
  691. current_worker->state = corestate;
  692. } else if (current_worker->state != FINISHED) {
  693. finished = 0;
  694. }
  695. }
  696. if ((0 == (count % display_interval)) &&
  697. (job->start_time != time_now)) {
  698. char buffer[256], *p = buffer;
  699. import_log_status_done(job);
  700. p += sprintf(p, "Processed %lu entries ", (u_long)job->ready_ID);
  701. if (job->total_pass > 1)
  702. p += sprintf(p, "(pass %d) ", job->total_pass);
  703. p += sprintf(p, "-- average rate %.1f/sec, ",
  704. job->average_progress_rate);
  705. p += sprintf(p, "recent rate %.1f/sec, ",
  706. job->recent_progress_rate);
  707. p += sprintf(p, "hit ratio %.0f%%", job->cache_hit_ratio * 100.0);
  708. import_log_notice(job, "%s", buffer);
  709. }
  710. /* Then let's see if it's time to complete this import pass */
  711. if (!giveup) {
  712. giveup = import_throw_in_towel(job, time_now, trailing_ID);
  713. if (giveup) {
  714. /* If so, signal the lead thread to stop */
  715. import_log_notice(job, "Ending pass number %d ...",
  716. job->total_pass);
  717. foreman->command = STOP;
  718. while (foreman->state != FINISHED) {
  719. DS_Sleep(tenthsecond);
  720. }
  721. import_log_notice(job, "Foreman is done; waiting for "
  722. "workers to finish...");
  723. }
  724. }
  725. /* if the producer is finished, and the foreman has caught up... */
  726. if (producer) {
  727. producer_done = (producer->state == FINISHED) ||
  728. (producer->state == QUIT);
  729. } else {
  730. /* set in ldbm_back_wire_import */
  731. producer_done = (job->flags & FLAG_PRODUCER_DONE);
  732. }
  733. if (producer_done && (job->lead_ID == job->ready_ID)) {
  734. /* tell the foreman to stop if he's still working. */
  735. if (foreman->state != FINISHED)
  736. foreman->command = STOP;
  737. /* if all the workers are caught up too, we're done */
  738. if (trailing_ID == job->lead_ID)
  739. break;
  740. }
  741. /* if the foreman is done (end of pass) and the worker threads
  742. * have caught up...
  743. */
  744. if ((foreman->state == FINISHED) && (job->ready_ID == trailing_ID)) {
  745. break;
  746. }
  747. count++;
  748. }
  749. import_log_notice(job, "Workers finished; cleaning up...");
  750. /* Now tell all the workers to stop */
  751. for (current_worker = job->worker_list; current_worker != NULL;
  752. current_worker = current_worker->next) {
  753. if (current_worker->work_type != PRODUCER)
  754. current_worker->command = STOP;
  755. }
  756. /* Having done that, wait for them to say that they've stopped */
  757. for (current_worker = job->worker_list; current_worker != NULL; ) {
  758. if ((current_worker->state != FINISHED) &&
  759. (current_worker->state != ABORTED) &&
  760. (current_worker->state != QUIT) &&
  761. (current_worker->work_type != PRODUCER)) {
  762. DS_Sleep(tenthsecond); /* Only sleep if we hit a thread that is still not done */
  763. continue;
  764. } else {
  765. current_worker = current_worker->next;
  766. }
  767. }
  768. import_log_notice(job, "Workers cleaned up.");
  769. /* If we're here and giveup is true, and the primary hadn't finished
  770. * processing the input files, we need to return IMPORT_INCOMPLETE_PASS */
  771. if (giveup && (job->input_filenames || (job->flags & FLAG_ONLINE) ||
  772. (job->flags & FLAG_REINDEXING /* support multi-pass */))) {
  773. if (producer_done && (job->ready_ID == job->lead_ID)) {
  774. /* foreman caught up with the producer, and the producer is
  775. * done.
  776. */
  777. *status = IMPORT_COMPLETE_PASS;
  778. } else {
  779. *status = IMPORT_INCOMPLETE_PASS;
  780. }
  781. } else {
  782. *status = IMPORT_COMPLETE_PASS;
  783. }
  784. return rc;
  785. error_abort:
  786. return ERR_IMPORT_ABORTED;
  787. }
  788. /********** running passes **********/
  789. static int import_run_pass(ImportJob *job, int *status)
  790. {
  791. int ret = 0;
  792. /* Start the threads running */
  793. ret = import_start_threads(job);
  794. if (ret != 0) {
  795. import_log_notice(job, "Starting threads failed: %d\n", ret);
  796. goto error;
  797. }
  798. /* Monitor the threads until we're done or fail */
  799. ret = import_monitor_threads(job, status);
  800. if ((ret == ERR_IMPORT_ABORTED) || (ret == NEED_DN_NORM) ||
  801. (ret == NEED_DN_NORM_SP) || (ret == NEED_DN_NORM_BT)) {
  802. import_log_notice(job, "Thread monitoring returned: %d\n", ret);
  803. goto error;
  804. } else if (ret != 0) {
  805. import_log_notice(job, "Thread monitoring aborted: %d\n", ret);
  806. goto error;
  807. }
  808. error:
  809. return ret;
  810. }
  811. static void import_set_abort_flag_all(ImportJob *job, int wait_for_them)
  812. {
  813. ImportWorkerInfo *worker;
  814. /* tell all the worker threads to abort */
  815. job->flags |= FLAG_ABORT;
  816. /* setting of the flag in the job will be detected in the worker, foreman
  817. * threads and if there are any threads which have a sleeptime 200 msecs
  818. * = import_sleep_time; after that time, they will examine the condition
  819. * (job->flags & FLAG_ABORT) which will unblock the thread to proceed to
  820. * abort. Hence, we will sleep here for atleast 3 sec to make sure clean
  821. * up occurs */
  822. /* allow all the aborts to be processed */
  823. DS_Sleep(PR_MillisecondsToInterval(3000));
  824. if (wait_for_them) {
  825. /* Having done that, wait for them to say that they've stopped */
  826. for (worker = job->worker_list; worker != NULL; ) {
  827. DS_Sleep(PR_MillisecondsToInterval(100));
  828. if ((worker->state != FINISHED) && (worker->state != ABORTED) &&
  829. (worker->state != QUIT)){
  830. continue;
  831. } else {
  832. worker = worker->next;
  833. }
  834. }
  835. }
  836. }
  837. /* tell all the threads to abort */
  838. void import_abort_all(ImportJob *job, int wait_for_them)
  839. {
  840. ImportWorkerInfo *worker;
  841. /* tell all the worker threads to abort */
  842. job->flags |= FLAG_ABORT;
  843. for (worker = job->worker_list; worker; worker = worker->next)
  844. worker->command = ABORT;
  845. if (wait_for_them) {
  846. /* Having done that, wait for them to say that they've stopped */
  847. for (worker = job->worker_list; worker != NULL; ) {
  848. DS_Sleep(PR_MillisecondsToInterval(100));
  849. if ((worker->state != FINISHED) && (worker->state != ABORTED) &&
  850. (worker->state != QUIT)) {
  851. continue;
  852. } else {
  853. worker = worker->next;
  854. }
  855. }
  856. }
  857. }
  858. /* Helper function to make up filenames */
  859. int import_make_merge_filenames(char *directory, char *indexname, int pass,
  860. char **oldname, char **newname)
  861. {
  862. /* Filenames look like this: attributename<LDBM_FILENAME_SUFFIX>
  863. and need to be renamed to: attributename<LDBM_FILENAME_SUFFIX>.n
  864. where n is the pass number.
  865. */
  866. *oldname = slapi_ch_smprintf("%s/%s%s", directory, indexname, LDBM_FILENAME_SUFFIX);
  867. *newname = slapi_ch_smprintf("%s/%s.%d%s", directory, indexname, pass,
  868. LDBM_FILENAME_SUFFIX);
  869. if (!*oldname || !*newname) {
  870. slapi_ch_free_string(oldname);
  871. slapi_ch_free_string(newname);
  872. return -1;
  873. }
  874. return 0;
  875. }
  876. /* Task here is as follows:
  877. * First, if this is pass #1, check for the presence of a merge
  878. * directory. If it is not present, create it.
  879. * If it is present, delete all the files in it.
  880. * Then, flush the dblayer and close files.
  881. * Now create a numbered subdir of the merge directory for this pass.
  882. * Next, move the index files, except entrydn, parentid and id2entry to
  883. * the merge subdirectory. Important to move if we can, because
  884. * that can be millions of times faster than a copy.
  885. * Finally open the dblayer back up because the caller expects
  886. * us to not muck with it.
  887. */
  888. static int import_sweep_after_pass(ImportJob *job)
  889. {
  890. backend *be = job->inst->inst_be;
  891. int ret = 0;
  892. import_log_notice(job, "Sweeping files for merging later...");
  893. ret = dblayer_instance_close(be);
  894. if (0 == ret) {
  895. /* Walk the list of index jobs */
  896. ImportWorkerInfo *current_worker = NULL;
  897. for (current_worker = job->worker_list; current_worker != NULL;
  898. current_worker = current_worker->next) {
  899. /* Foreach job, rename the file to <filename>.n, where n is the
  900. * pass number */
  901. if ((current_worker->work_type != FOREMAN) &&
  902. (current_worker->work_type != PRODUCER) &&
  903. (strcasecmp(current_worker->index_info->name, LDBM_PARENTID_STR) != 0)) {
  904. char *newname = NULL;
  905. char *oldname = NULL;
  906. ret = import_make_merge_filenames(job->inst->inst_dir_name,
  907. current_worker->index_info->name, job->current_pass,
  908. &oldname, &newname);
  909. if (0 != ret) {
  910. break;
  911. }
  912. if (PR_Access(oldname, PR_ACCESS_EXISTS) == PR_SUCCESS) {
  913. ret = PR_Rename(oldname, newname);
  914. if (ret != PR_SUCCESS) {
  915. PRErrorCode prerr = PR_GetError();
  916. import_log_notice(job, "Failed to rename file \"%s\" to \"%s\", "
  917. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)",
  918. oldname, newname, prerr, slapd_pr_strerror(prerr));
  919. slapi_ch_free( (void**)&newname);
  920. slapi_ch_free( (void**)&oldname);
  921. break;
  922. }
  923. }
  924. slapi_ch_free( (void**)&newname);
  925. slapi_ch_free( (void**)&oldname);
  926. }
  927. }
  928. ret = dblayer_instance_start(be, DBLAYER_IMPORT_MODE);
  929. }
  930. if (0 == ret) {
  931. import_log_notice(job, "Sweep done.");
  932. } else {
  933. if (ENOSPC == ret) {
  934. import_log_notice(job, "ERROR: NO DISK SPACE LEFT in sweep phase");
  935. } else {
  936. import_log_notice(job, "ERROR: Sweep phase error %d (%s)", ret,
  937. dblayer_strerror(ret));
  938. }
  939. }
  940. return ret;
  941. }
  942. /* when the import is done, this function is called to bring stuff back up.
  943. * returns 0 on success; anything else is an error
  944. */
  945. static int import_all_done(ImportJob *job, int ret)
  946. {
  947. ldbm_instance *inst = job->inst;
  948. /* Writing this file indicates to future server startups that
  949. * the db is OK unless it's in the dry run mode. */
  950. if ((ret == 0) && !(job->flags & FLAG_DRYRUN)) {
  951. char inst_dir[MAXPATHLEN*2];
  952. char *inst_dirp = NULL;
  953. inst_dirp = dblayer_get_full_inst_dir(inst->inst_li, inst,
  954. inst_dir, MAXPATHLEN*2);
  955. ret = dbversion_write(inst->inst_li, inst_dirp, NULL, DBVERSION_ALL);
  956. if (inst_dirp != inst_dir)
  957. slapi_ch_free_string(&inst_dirp);
  958. }
  959. if ((job->task != NULL) && (0 == slapi_task_get_refcount(job->task))) {
  960. slapi_task_finish(job->task, ret);
  961. }
  962. if (job->flags & FLAG_ONLINE) {
  963. /* make sure the indexes are online as well */
  964. /* richm 20070919 - if index entries are added online, they
  965. are created and marked as INDEX_OFFLINE, in anticipation
  966. of someone doing a db2index. In this case, the db2index
  967. code will correctly unset the INDEX_OFFLINE flag.
  968. However, if import is used to create the indexes, the
  969. INDEX_OFFLINE flag will not be cleared. So, we do that
  970. here
  971. */
  972. IndexInfo *index = job->index_list;
  973. while (index != NULL) {
  974. index->ai->ai_indexmask &= ~INDEX_OFFLINE;
  975. index = index->next;
  976. }
  977. /* start up the instance */
  978. ret = dblayer_instance_start(job->inst->inst_be, DBLAYER_NORMAL_MODE);
  979. if (ret != 0)
  980. return ret;
  981. /* Reset USN slapi_counter with the last key of the entryUSN index */
  982. ldbm_set_last_usn(inst->inst_be);
  983. /* bring backend online again */
  984. slapi_mtn_be_enable(inst->inst_be);
  985. }
  986. return ret;
  987. }
  988. int import_main_offline(void *arg)
  989. {
  990. ImportJob *job = (ImportJob *)arg;
  991. ldbm_instance *inst = job->inst;
  992. backend *be = inst->inst_be;
  993. int ret = 0;
  994. time_t beginning = 0;
  995. time_t end = 0;
  996. int finished = 0;
  997. int status = 0;
  998. int verbose = 1;
  999. int aborted = 0;
  1000. ImportWorkerInfo *producer = NULL;
  1001. char *opstr = "Import";
  1002. if (job->task)
  1003. slapi_task_inc_refcount(job->task);
  1004. if (job->flags & (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1)) {
  1005. if (job->flags & FLAG_DRYRUN) {
  1006. opstr = "Upgrade Dn Dryrun";
  1007. } else if ((job->flags & (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1))
  1008. == (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1)) {
  1009. opstr = "Upgrade Dn (Full)";
  1010. } else if (job->flags & FLAG_UPGRADEDNFORMAT_V1) {
  1011. opstr = "Upgrade Dn (Spaces)";
  1012. } else {
  1013. opstr = "Upgrade Dn (RFC 4514)";
  1014. }
  1015. } else if (job->flags & FLAG_REINDEXING) {
  1016. opstr = "Reindexing";
  1017. }
  1018. PR_ASSERT(inst != NULL);
  1019. time(&beginning);
  1020. /* Decide which indexes are needed */
  1021. if (job->flags & FLAG_INDEX_ATTRS) {
  1022. /* Here, we get an AVL tree which contains nodes for all attributes
  1023. * in the schema. Given this tree, we need to identify those nodes
  1024. * which are marked for indexing. */
  1025. avl_apply(job->inst->inst_attrs, (IFP)import_attr_callback,
  1026. (caddr_t)job, -1, AVL_INORDER);
  1027. vlv_getindices((IFP)import_attr_callback, (void *)job, be);
  1028. }
  1029. /* Determine how much index buffering space to allocate to each index */
  1030. import_set_index_buffer_size(job);
  1031. /* initialize the entry FIFO */
  1032. ret = import_fifo_init(job);
  1033. if (ret) {
  1034. if (! (job->flags & FLAG_USE_FILES)) {
  1035. PR_Lock(job->wire_lock);
  1036. PR_NotifyCondVar(job->wire_cv);
  1037. PR_Unlock(job->wire_lock);
  1038. }
  1039. goto error;
  1040. }
  1041. if (job->flags & FLAG_USE_FILES) {
  1042. /* importing from files: start up a producer thread to read the
  1043. * files and queue them
  1044. */
  1045. producer = CALLOC(ImportWorkerInfo);
  1046. if (! producer)
  1047. goto error;
  1048. /* start the producer */
  1049. import_init_worker_info(producer, job);
  1050. producer->work_type = PRODUCER;
  1051. if (job->flags & (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1))
  1052. {
  1053. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)upgradedn_producer,
  1054. producer, PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  1055. PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  1056. PRErrorCode prerr = PR_GetError();
  1057. LDAPDebug(LDAP_DEBUG_ANY,
  1058. "unable to spawn upgrade dn producer thread, "
  1059. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  1060. prerr, slapd_pr_strerror(prerr), 0);
  1061. goto error;
  1062. }
  1063. }
  1064. else if (job->flags & FLAG_REINDEXING)
  1065. {
  1066. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)index_producer, producer,
  1067. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  1068. PR_UNJOINABLE_THREAD,
  1069. SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  1070. PRErrorCode prerr = PR_GetError();
  1071. LDAPDebug(LDAP_DEBUG_ANY,
  1072. "unable to spawn index producer thread, "
  1073. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  1074. prerr, slapd_pr_strerror(prerr), 0);
  1075. goto error;
  1076. }
  1077. }
  1078. else
  1079. {
  1080. import_log_notice(job, "Beginning import job...");
  1081. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_producer, producer,
  1082. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  1083. PR_UNJOINABLE_THREAD,
  1084. SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  1085. PRErrorCode prerr = PR_GetError();
  1086. LDAPDebug(LDAP_DEBUG_ANY,
  1087. "unable to spawn import producer thread, "
  1088. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  1089. prerr, slapd_pr_strerror(prerr), 0);
  1090. goto error;
  1091. }
  1092. }
  1093. if (0 == job->job_index_buffer_suggestion)
  1094. import_log_notice(job, "Index buffering is disabled.");
  1095. else
  1096. import_log_notice(job,
  1097. "Index buffering enabled with bucket size %lu",
  1098. (long unsigned int)job->job_index_buffer_suggestion);
  1099. job->worker_list = producer;
  1100. } else {
  1101. /* release the startup lock and let the entries start queueing up
  1102. * in for import */
  1103. PR_Lock(job->wire_lock);
  1104. PR_NotifyCondVar(job->wire_cv);
  1105. PR_Unlock(job->wire_lock);
  1106. }
  1107. /* Run as many passes as we need to complete the job or die honourably in
  1108. * the attempt */
  1109. while (! finished) {
  1110. job->current_pass++;
  1111. job->total_pass++;
  1112. ret = import_run_pass(job, &status);
  1113. /* The following could have happened:
  1114. * (a) Some error happened such that we're hosed.
  1115. * This is indicated by a non-zero return code.
  1116. * (b) We finished the complete file without needing a second pass
  1117. * This is indicated by a zero return code and a status of
  1118. * IMPORT_COMPLETE_PASS and current_pass == 1;
  1119. * (c) We completed a pass and need at least another one
  1120. * This is indicated by a zero return code and a status of
  1121. * IMPORT_INCOMPLETE_PASS
  1122. * (d) We just completed what turned out to be the last in a
  1123. * series of passes
  1124. * This is indicated by a zero return code and a status of
  1125. * IMPORT_COMPLETE_PASS and current_pass > 1
  1126. */
  1127. if (ret == ERR_IMPORT_ABORTED) {
  1128. /* at least one of the threads has aborted -- shut down ALL
  1129. * of the threads */
  1130. import_log_notice(job, "Aborting all %s threads...", opstr);
  1131. /* this abort sets the abort flag on the threads and will block for
  1132. * the exit of all threads
  1133. */
  1134. import_set_abort_flag_all(job, 1);
  1135. import_log_notice(job, "%s threads aborted.", opstr);
  1136. aborted = 1;
  1137. goto error;
  1138. }
  1139. if ((ret == NEED_DN_NORM) || (ret == NEED_DN_NORM_SP) ||
  1140. (ret == NEED_DN_NORM_BT)) {
  1141. goto error;
  1142. } else if (0 != ret) {
  1143. /* Some horrible fate has befallen the import */
  1144. import_log_notice(job, "Fatal pass error %d", ret);
  1145. goto error;
  1146. }
  1147. /* No error, but a number of possibilities */
  1148. if ( IMPORT_COMPLETE_PASS == status ) {
  1149. if (1 == job->current_pass) {
  1150. /* We're done !!!! */ ;
  1151. } else {
  1152. /* Save the files, then merge */
  1153. ret = import_sweep_after_pass(job);
  1154. if (0 != ret) {
  1155. goto error;
  1156. }
  1157. ret = import_mega_merge(job);
  1158. if (0 != ret) {
  1159. goto error;
  1160. }
  1161. }
  1162. finished = 1;
  1163. } else {
  1164. if (IMPORT_INCOMPLETE_PASS == status) {
  1165. /* Need to go round again */
  1166. /* Time to save the files we've built for later */
  1167. ret = import_sweep_after_pass(job);
  1168. if (0 != ret) {
  1169. goto error;
  1170. }
  1171. if ( (inst->inst_li->li_maxpassbeforemerge != 0) &&
  1172. (job->current_pass > inst->inst_li->li_maxpassbeforemerge) )
  1173. {
  1174. ret = import_mega_merge(job);
  1175. if (0 != ret) {
  1176. goto error;
  1177. }
  1178. job->current_pass = 1;
  1179. ret = import_sweep_after_pass(job);
  1180. if (0 != ret) {
  1181. goto error;
  1182. }
  1183. }
  1184. /* Fixup the first_ID value to reflect previous work */
  1185. job->first_ID = job->ready_ID + 1;
  1186. import_free_thread_data(job);
  1187. job->worker_list = producer;
  1188. import_log_notice(job, "Beginning pass number %d",
  1189. job->total_pass+1);
  1190. } else {
  1191. /* Bizarro-slapd */
  1192. goto error;
  1193. }
  1194. }
  1195. }
  1196. /* kill the producer now; we're done */
  1197. if (producer) {
  1198. import_log_notice(job, "Cleaning up producer thread...");
  1199. producer->command = STOP;
  1200. /* wait for the lead thread to stop */
  1201. while (producer->state != FINISHED) {
  1202. DS_Sleep(PR_MillisecondsToInterval(100));
  1203. }
  1204. }
  1205. import_log_notice(job, "Indexing complete. Post-processing...");
  1206. /* Now do the numsubordinates attribute */
  1207. /* [610066] reindexed db cannot be used in the following backup/restore */
  1208. if ( (!(job->flags & FLAG_REINDEXING) || (job->flags & FLAG_DN2RDN)) &&
  1209. (ret = update_subordinatecounts(be, job->mothers, job->encrypt, NULL))
  1210. != 0 ) {
  1211. import_log_notice(job, "Failed to update numsubordinates attributes");
  1212. goto error;
  1213. }
  1214. import_log_notice(job, "Generating numSubordinates complete.");
  1215. if (!entryrdn_get_noancestorid()) {
  1216. /* And the ancestorid index */
  1217. /* Creating ancestorid from the scratch; delete the index file first. */
  1218. struct attrinfo *ai = NULL;
  1219. ainfo_get(be, "ancestorid", &ai);
  1220. dblayer_erase_index_file(be, ai, 0);
  1221. if ((ret = ldbm_ancestorid_create_index(be)) != 0) {
  1222. import_log_notice(job, "Failed to create ancestorid index");
  1223. goto error;
  1224. }
  1225. }
  1226. import_log_notice(job, "Flushing caches...");
  1227. if (0 != (ret = dblayer_flush(job->inst->inst_li)) ) {
  1228. import_log_notice(job, "Failed to flush database");
  1229. goto error;
  1230. }
  1231. /* New way to exit the routine: check the return code.
  1232. * If it's non-zero, delete the database files.
  1233. * Otherwise don't, but always close the database layer properly.
  1234. * Then return. This ensures that we can't make a half-good/half-bad
  1235. * Database. */
  1236. error:
  1237. /* If we fail, the database is now in a mess, so we delete it
  1238. except dry run mode */
  1239. import_log_notice(job, "Closing files...");
  1240. cache_clear(&job->inst->inst_cache, CACHE_TYPE_ENTRY);
  1241. if (entryrdn_get_switch()) {
  1242. cache_clear(&job->inst->inst_dncache, CACHE_TYPE_DN);
  1243. }
  1244. if (aborted) {
  1245. /* If aborted, it's safer to rebuild the caches. */
  1246. cache_destroy_please(&job->inst->inst_cache, CACHE_TYPE_ENTRY);
  1247. if (entryrdn_get_switch()) { /* subtree-rename: on */
  1248. cache_destroy_please(&job->inst->inst_dncache, CACHE_TYPE_DN);
  1249. }
  1250. /* initialize the entry cache */
  1251. if (! cache_init(&(inst->inst_cache), DEFAULT_CACHE_SIZE,
  1252. DEFAULT_CACHE_ENTRIES, CACHE_TYPE_ENTRY)) {
  1253. LDAPDebug0Args(LDAP_DEBUG_ANY, "import_main_offline: "
  1254. "cache_init failed. Server should be restarted.\n");
  1255. }
  1256. /* initialize the dn cache */
  1257. if (! cache_init(&(inst->inst_dncache), DEFAULT_DNCACHE_SIZE,
  1258. DEFAULT_DNCACHE_MAXCOUNT, CACHE_TYPE_DN)) {
  1259. LDAPDebug0Args(LDAP_DEBUG_ANY, "import_main_offline: "
  1260. "dn cache_init failed. Server should be restarted.\n");
  1261. }
  1262. }
  1263. if (0 != ret) {
  1264. if (!(job->flags & (FLAG_DRYRUN|FLAG_UPGRADEDNFORMAT_V1))) {
  1265. /* If not dryrun NOR upgradedn space */
  1266. /* if running in the dry run mode, don't touch the db */
  1267. dblayer_delete_instance_dir(be);
  1268. }
  1269. dblayer_instance_close(job->inst->inst_be);
  1270. } else {
  1271. if (0 != (ret = dblayer_instance_close(job->inst->inst_be)) ) {
  1272. import_log_notice(job, "Failed to close database");
  1273. }
  1274. }
  1275. if (!(job->flags & FLAG_ONLINE))
  1276. dblayer_close(job->inst->inst_li, DBLAYER_IMPORT_MODE);
  1277. time(&end);
  1278. if (verbose && (0 == ret)) {
  1279. int seconds_to_import = end - beginning;
  1280. size_t entries_processed = job->lead_ID - (job->starting_ID - 1);
  1281. double entries_per_second =
  1282. seconds_to_import ?
  1283. (double)entries_processed / (double)seconds_to_import : 0;
  1284. if (job->not_here_skipped) {
  1285. if (job->skipped) {
  1286. import_log_notice(job,
  1287. "%s complete. Processed %lu entries "
  1288. "(%d bad entries were skipped, "
  1289. "%d entries were skipped because they don't "
  1290. "belong to this database) in %d seconds. "
  1291. "(%.2f entries/sec)",
  1292. opstr, (long unsigned int)entries_processed,
  1293. job->skipped, job->not_here_skipped,
  1294. seconds_to_import, entries_per_second);
  1295. } else {
  1296. import_log_notice(job,
  1297. "%s complete. Processed %lu entries "
  1298. "(%d entries were skipped because they don't "
  1299. "belong to this database) "
  1300. "in %d seconds. (%.2f entries/sec)",
  1301. opstr, (long unsigned int)entries_processed,
  1302. job->not_here_skipped, seconds_to_import,
  1303. entries_per_second);
  1304. }
  1305. } else {
  1306. if (job->skipped) {
  1307. import_log_notice(job,
  1308. "%s complete. Processed %lu entries "
  1309. "(%d were skipped) in %d seconds. "
  1310. "(%.2f entries/sec)",
  1311. opstr, (long unsigned int)entries_processed,
  1312. job->skipped, seconds_to_import,
  1313. entries_per_second);
  1314. } else {
  1315. import_log_notice(job,
  1316. "%s complete. Processed %lu entries "
  1317. "in %d seconds. (%.2f entries/sec)",
  1318. opstr, (long unsigned int)entries_processed,
  1319. seconds_to_import, entries_per_second);
  1320. }
  1321. }
  1322. }
  1323. if (job->flags & (FLAG_DRYRUN|FLAG_UPGRADEDNFORMAT_V1)) {
  1324. if (0 == ret) {
  1325. import_log_notice(job, "%s complete. %s is up-to-date.",
  1326. opstr, job->inst->inst_name);
  1327. ret = 0;
  1328. if (job->task) {
  1329. slapi_task_dec_refcount(job->task);
  1330. }
  1331. import_all_done(job, ret);
  1332. } else if (NEED_DN_NORM_BT == ret) {
  1333. import_log_notice(job, "%s complete. %s needs upgradednformat all.",
  1334. opstr, job->inst->inst_name);
  1335. if (job->task) {
  1336. slapi_task_dec_refcount(job->task);
  1337. }
  1338. import_all_done(job, ret);
  1339. ret = 1;
  1340. } else if (NEED_DN_NORM == ret) {
  1341. import_log_notice(job, "%s complete. %s needs upgradednformat.",
  1342. opstr, job->inst->inst_name);
  1343. if (job->task) {
  1344. slapi_task_dec_refcount(job->task);
  1345. }
  1346. import_all_done(job, ret);
  1347. ret = 2;
  1348. } else if (NEED_DN_NORM_SP == ret) {
  1349. import_log_notice(job,
  1350. "%s complete. %s needs upgradednformat spaces.",
  1351. opstr, job->inst->inst_name);
  1352. if (job->task) {
  1353. slapi_task_dec_refcount(job->task);
  1354. }
  1355. import_all_done(job, ret);
  1356. ret = 3;
  1357. } else {
  1358. ret = -1;
  1359. if (job->task != NULL) {
  1360. slapi_task_finish(job->task, ret);
  1361. }
  1362. }
  1363. } else if (0 != ret) {
  1364. import_log_notice(job, "%s failed.", opstr);
  1365. if (job->task != NULL) {
  1366. slapi_task_finish(job->task, ret);
  1367. }
  1368. } else {
  1369. if (job->task) {
  1370. slapi_task_dec_refcount(job->task);
  1371. }
  1372. import_all_done(job, ret);
  1373. }
  1374. /* This instance isn't busy anymore */
  1375. instance_set_not_busy(job->inst);
  1376. import_free_job(job);
  1377. if (!job->task) {
  1378. FREE(job);
  1379. }
  1380. if (producer)
  1381. FREE(producer);
  1382. return(ret);
  1383. }
  1384. /*
  1385. * to be called by online import using PR_CreateThread()
  1386. * offline import directly calls import_main_offline()
  1387. *
  1388. */
  1389. void import_main(void *arg)
  1390. {
  1391. import_main_offline(arg);
  1392. }
  1393. int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb)
  1394. {
  1395. backend *be = NULL;
  1396. int noattrindexes = 0;
  1397. ImportJob *job = NULL;
  1398. char **name_array = NULL;
  1399. int total_files, i;
  1400. int up_flags = 0;
  1401. PRThread *thread = NULL;
  1402. job = CALLOC(ImportJob);
  1403. if (job == NULL) {
  1404. LDAPDebug(LDAP_DEBUG_ANY, "not enough memory to do import job\n",
  1405. 0, 0, 0);
  1406. return -1;
  1407. }
  1408. slapi_pblock_get( pb, SLAPI_BACKEND, &be);
  1409. PR_ASSERT(NULL != be);
  1410. job->inst = (ldbm_instance *)be->be_instance_info;
  1411. slapi_pblock_get( pb, SLAPI_LDIF2DB_NOATTRINDEXES, &noattrindexes );
  1412. slapi_pblock_get( pb, SLAPI_LDIF2DB_FILE, &name_array );
  1413. slapi_pblock_get(pb, SLAPI_SEQ_TYPE, &up_flags); /* For upgrade dn and
  1414. dn2rdn */
  1415. /* the removedupvals field is blatantly overloaded here to mean
  1416. * the chunk size too. (chunk size = number of entries that should
  1417. * be imported before starting a new pass. usually for debugging.)
  1418. */
  1419. slapi_pblock_get(pb, SLAPI_LDIF2DB_REMOVEDUPVALS, &job->merge_chunk_size);
  1420. if (job->merge_chunk_size == 1)
  1421. job->merge_chunk_size = 0;
  1422. /* get list of specifically included and/or excluded subtrees from
  1423. * the front-end */
  1424. ldbm_back_fetch_incl_excl(pb, &job->include_subtrees,
  1425. &job->exclude_subtrees);
  1426. /* get cn=tasks info, if any */
  1427. slapi_pblock_get(pb, SLAPI_BACKEND_TASK, &job->task);
  1428. slapi_pblock_get(pb, SLAPI_LDIF2DB_ENCRYPT, &job->encrypt);
  1429. /* get uniqueid info */
  1430. slapi_pblock_get(pb, SLAPI_LDIF2DB_GENERATE_UNIQUEID, &job->uuid_gen_type);
  1431. if (job->uuid_gen_type == SLAPI_UNIQUEID_GENERATE_NAME_BASED) {
  1432. char *namespaceid;
  1433. slapi_pblock_get(pb, SLAPI_LDIF2DB_NAMESPACEID, &namespaceid);
  1434. job->uuid_namespace = slapi_ch_strdup(namespaceid);
  1435. }
  1436. job->flags = FLAG_USE_FILES;
  1437. if (NULL == name_array) { /* no ldif file is given -> reindexing or
  1438. upgradedn */
  1439. if (up_flags & (SLAPI_UPGRADEDNFORMAT|SLAPI_UPGRADEDNFORMAT_V1)) {
  1440. if (up_flags & SLAPI_UPGRADEDNFORMAT) {
  1441. job->flags |= FLAG_UPGRADEDNFORMAT;
  1442. }
  1443. if (up_flags & SLAPI_UPGRADEDNFORMAT_V1) {
  1444. job->flags |= FLAG_UPGRADEDNFORMAT_V1;
  1445. }
  1446. if (up_flags & SLAPI_DRYRUN) {
  1447. job->flags |= FLAG_DRYRUN;
  1448. }
  1449. } else {
  1450. job->flags |= FLAG_REINDEXING; /* call index_producer */
  1451. if (up_flags & SLAPI_UPGRADEDB_DN2RDN) {
  1452. if (entryrdn_get_switch()) {
  1453. job->flags |= FLAG_DN2RDN; /* migrate to the rdn format */
  1454. } else {
  1455. LDAPDebug1Arg(LDAP_DEBUG_ANY,
  1456. "DN to RDN option is specified, "
  1457. "but %s is not enabled\n",
  1458. CONFIG_ENTRYRDN_SWITCH);
  1459. import_free_job(job);
  1460. FREE(job);
  1461. return -1;
  1462. }
  1463. }
  1464. }
  1465. }
  1466. if (!noattrindexes) {
  1467. job->flags |= FLAG_INDEX_ATTRS;
  1468. }
  1469. for (i = 0; name_array && name_array[i] != NULL; i++) {
  1470. charray_add(&job->input_filenames, slapi_ch_strdup(name_array[i]));
  1471. }
  1472. job->starting_ID = 1;
  1473. job->first_ID = 1;
  1474. job->mothers = CALLOC(import_subcount_stuff);
  1475. /* how much space should we allocate to index buffering? */
  1476. job->job_index_buffer_size = import_get_index_buffer_size();
  1477. if (job->job_index_buffer_size == 0) {
  1478. /* 10% of the allocated cache size + one meg */
  1479. PR_Lock(job->inst->inst_li->li_config_mutex);
  1480. job->job_index_buffer_size =
  1481. (job->inst->inst_li->li_import_cachesize/10) + (1024*1024);
  1482. PR_Unlock(job->inst->inst_li->li_config_mutex);
  1483. }
  1484. import_subcount_stuff_init(job->mothers);
  1485. if (job->task != NULL) {
  1486. /* count files, use that to track "progress" in cn=tasks */
  1487. total_files = 0;
  1488. while (name_array && name_array[total_files] != NULL)
  1489. total_files++;
  1490. /* add 1 to account for post-import cleanup (which can take a
  1491. * significant amount of time)
  1492. */
  1493. /* NGK - This should eventually be cleaned up to use the public
  1494. * task API. */
  1495. if (0 == total_files) { /* reindexing */
  1496. job->task->task_work = 2;
  1497. } else {
  1498. job->task->task_work = total_files + 1;
  1499. }
  1500. job->task->task_progress = 0;
  1501. job->task->task_state = SLAPI_TASK_RUNNING;
  1502. slapi_task_set_data(job->task, job);
  1503. slapi_task_set_destructor_fn(job->task, import_task_destroy);
  1504. slapi_task_set_cancel_fn(job->task, import_task_abort);
  1505. job->flags |= FLAG_ONLINE;
  1506. /* create thread for import_main, so we can return */
  1507. thread = PR_CreateThread(PR_USER_THREAD, import_main, (void *)job,
  1508. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  1509. PR_UNJOINABLE_THREAD,
  1510. SLAPD_DEFAULT_THREAD_STACKSIZE);
  1511. if (thread == NULL) {
  1512. PRErrorCode prerr = PR_GetError();
  1513. LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import thread, "
  1514. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  1515. prerr, slapd_pr_strerror(prerr), 0);
  1516. import_free_job(job);
  1517. FREE(job);
  1518. return -2;
  1519. }
  1520. return 0;
  1521. }
  1522. /* old style -- do it all synchronously (THIS IS GOING AWAY SOON) */
  1523. return import_main_offline((void *)job);
  1524. }