import.c 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718
  1. /** BEGIN COPYRIGHT BLOCK
  2. * This Program is free software; you can redistribute it and/or modify it under
  3. * the terms of the GNU General Public License as published by the Free Software
  4. * Foundation; version 2 of the License.
  5. *
  6. * This Program is distributed in the hope that it will be useful, but WITHOUT
  7. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  8. * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  9. *
  10. * You should have received a copy of the GNU General Public License along with
  11. * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
  12. * Place, Suite 330, Boston, MA 02111-1307 USA.
  13. *
  14. * In addition, as a special exception, Red Hat, Inc. gives You the additional
  15. * right to link the code of this Program with code not covered under the GNU
  16. * General Public License ("Non-GPL Code") and to distribute linked combinations
  17. * including the two, subject to the limitations in this paragraph. Non-GPL Code
  18. * permitted under this exception must only link to the code of this Program
  19. * through those well defined interfaces identified in the file named EXCEPTION
  20. * found in the source code files (the "Approved Interfaces"). The files of
  21. * Non-GPL Code may instantiate templates or use macros or inline functions from
  22. * the Approved Interfaces without causing the resulting work to be covered by
  23. * the GNU General Public License. Only Red Hat, Inc. may make changes or
  24. * additions to the list of Approved Interfaces. You must obey the GNU General
  25. * Public License in all respects for all of the Program code and other code used
  26. * in conjunction with the Program except the Non-GPL Code covered by this
  27. * exception. If you modify this file, you may extend this exception to your
  28. * version of the file, but you are not obligated to do so. If you do not wish to
  29. * provide this exception without modification, you must delete this exception
  30. * statement from your version and license this file solely under the GPL without
  31. * exception.
  32. *
  33. *
  34. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  35. * Copyright (C) 2005 Red Hat, Inc.
  36. * All rights reserved.
  37. * END COPYRIGHT BLOCK **/
  38. #ifdef HAVE_CONFIG_H
  39. # include <config.h>
  40. #endif
  41. /*
  42. * the "new" ("deluxe") backend import code
  43. *
  44. * please make sure you use 4-space indentation on this file.
  45. */
  46. #include "back-ldbm.h"
  47. #include "vlv_srch.h"
  48. #include "import.h"
  49. #define ERR_IMPORT_ABORTED -23
  50. #define NEED_DN_NORM -24
  51. #define NEED_DN_NORM_SP -25
  52. #define NEED_DN_NORM_BT -26
  53. /********** routines to manipulate the entry fifo **********/
  54. /* this is pretty bogus -- could be a HUGE amount of memory */
  55. /* Not anymore with the Import Queue Adaptative Algorithm (Regulation) */
  56. #define MAX_FIFO_SIZE 8000
  57. static int import_fifo_init(ImportJob *job)
  58. {
  59. ldbm_instance *inst = job->inst;
  60. /* Work out how big the entry fifo can be */
  61. if (inst->inst_cache.c_maxentries > 0)
  62. job->fifo.size = inst->inst_cache.c_maxentries;
  63. else
  64. job->fifo.size = inst->inst_cache.c_maxsize / 1024; /* guess */
  65. /* byte limit that should be respected to avoid memory starvation */
  66. /* conservative computing: multiply by .8 to allow for reasonable overflow */
  67. job->fifo.bsize = (inst->inst_cache.c_maxsize/10) << 3;
  68. job->fifo.c_bsize = 0;
  69. if (job->fifo.size > MAX_FIFO_SIZE)
  70. job->fifo.size = MAX_FIFO_SIZE;
  71. /* has to be at least 1 or 2, and anything less than about 100 destroys
  72. * the point of doing all this optimization in the first place. */
  73. if (job->fifo.size < 100)
  74. job->fifo.size = 100;
  75. /* Get memory for the entry fifo */
  76. /* This is used to keep a ref'ed pointer to the last <cachesize>
  77. * processed entries */
  78. PR_ASSERT(NULL == job->fifo.item);
  79. job->fifo.item = (FifoItem *)slapi_ch_calloc(job->fifo.size,
  80. sizeof(FifoItem));
  81. if (NULL == job->fifo.item) {
  82. /* Memory allocation error */
  83. return -1;
  84. }
  85. return 0;
  86. }
  87. FifoItem *import_fifo_fetch(ImportJob *job, ID id, int worker)
  88. {
  89. int idx = id % job->fifo.size;
  90. FifoItem *fi;
  91. if (job->fifo.item) {
  92. fi = &(job->fifo.item[idx]);
  93. } else {
  94. return NULL;
  95. }
  96. if (fi->entry) {
  97. if (worker) {
  98. if (fi->bad) {
  99. if (fi->bad == FIFOITEM_BAD) {
  100. fi->bad = FIFOITEM_BAD_PRINTED;
  101. if (!(job->flags & FLAG_UPGRADEDNFORMAT_V1)) {
  102. import_log_notice(job, "WARNING: bad entry: ID %d", id);
  103. }
  104. }
  105. return NULL;
  106. }
  107. PR_ASSERT(fi->entry->ep_refcnt > 0);
  108. }
  109. }
  110. return fi;
  111. }
  112. static void import_fifo_destroy(ImportJob *job)
  113. {
  114. /* Free any entries in the fifo first */
  115. struct backentry *be = NULL;
  116. size_t i = 0;
  117. for (i = 0; i < job->fifo.size; i++) {
  118. be = job->fifo.item[i].entry;
  119. backentry_free(&be);
  120. job->fifo.item[i].entry = NULL;
  121. job->fifo.item[i].filename = NULL;
  122. }
  123. slapi_ch_free((void **)&job->fifo.item);
  124. job->fifo.item = NULL;
  125. }
  126. /********** logging stuff **********/
  127. #define LOG_BUFFER 512
  128. /* this changes the 'nsTaskStatus' value, which is transient (anything logged
  129. * here wipes out any previous status)
  130. */
  131. static void import_log_status_start(ImportJob *job)
  132. {
  133. if (! job->task_status)
  134. job->task_status = (char *)slapi_ch_malloc(10 * LOG_BUFFER);
  135. if (! job->task_status)
  136. return; /* out of memory? */
  137. job->task_status[0] = 0;
  138. }
  139. static void import_log_status_add_line(ImportJob *job, char *format, ...)
  140. {
  141. va_list ap;
  142. int len = 0;
  143. if (! job->task_status)
  144. return;
  145. len = strlen(job->task_status);
  146. if (len + 5 > (10 * LOG_BUFFER))
  147. return; /* no room */
  148. if (job->task_status[0])
  149. strcat(job->task_status, "\n");
  150. va_start(ap, format);
  151. PR_vsnprintf(job->task_status + len, (10 * LOG_BUFFER) - len, format, ap);
  152. va_end(ap);
  153. }
  154. static void import_log_status_done(ImportJob *job)
  155. {
  156. if (job->task) {
  157. slapi_task_log_status(job->task, "%s", job->task_status);
  158. }
  159. }
  160. /* this adds a line to the 'nsTaskLog' value, which is cumulative (anything
  161. * logged here is added to the end)
  162. */
  163. void import_log_notice(ImportJob *job, char *format, ...)
  164. {
  165. va_list ap;
  166. char buffer[LOG_BUFFER];
  167. va_start(ap, format);
  168. PR_vsnprintf(buffer, LOG_BUFFER, format, ap);
  169. va_end(ap);
  170. if (job->task) {
  171. slapi_task_log_notice(job->task, "%s", buffer);
  172. }
  173. /* also save it in the logs for posterity */
  174. if (job->flags & (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1)) {
  175. LDAPDebug(LDAP_DEBUG_ANY, "upgradedn %s: %s\n", job->inst->inst_name,
  176. buffer, 0);
  177. } else if (job->flags & FLAG_REINDEXING) {
  178. LDAPDebug(LDAP_DEBUG_ANY, "reindex %s: %s\n", job->inst->inst_name,
  179. buffer, 0);
  180. } else {
  181. LDAPDebug(LDAP_DEBUG_ANY, "import %s: %s\n", job->inst->inst_name,
  182. buffer, 0);
  183. }
  184. }
  185. static void import_task_destroy(Slapi_Task *task)
  186. {
  187. ImportJob *job = (ImportJob *)slapi_task_get_data(task);
  188. if (job && job->task_status) {
  189. slapi_ch_free((void **)&job->task_status);
  190. job->task_status = NULL;
  191. }
  192. FREE(job);
  193. slapi_task_set_data(task, NULL);
  194. }
  195. static void import_task_abort(Slapi_Task *task)
  196. {
  197. ImportJob *job;
  198. /* don't log anything from here, because we're still holding the
  199. * DSE lock for modify...
  200. */
  201. if (slapi_task_get_state(task) == SLAPI_TASK_FINISHED) {
  202. /* too late */
  203. }
  204. /*
  205. * Race condition.
  206. * If the import thread happens to finish right now we're in trouble
  207. * because it will free the job.
  208. */
  209. job = (ImportJob *)slapi_task_get_data(task);
  210. import_abort_all(job, 0);
  211. while (slapi_task_get_state(task) != SLAPI_TASK_FINISHED)
  212. DS_Sleep(PR_MillisecondsToInterval(100));
  213. }
  214. /********** helper functions for importing **********/
  215. /* Function used to gather a list of indexed attrs */
  216. static int import_attr_callback(void *node, void *param)
  217. {
  218. ImportJob *job = (ImportJob *)param;
  219. struct attrinfo *a = (struct attrinfo *)node;
  220. if (job->flags & FLAG_DRYRUN) { /* dryrun; we don't need the workers */
  221. return 0;
  222. }
  223. if (job->flags & (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1)) {
  224. /* Bring up import workers just for indexes having DN syntax
  225. * attribute type. (except entrydn -- taken care below) */
  226. int rc = 0;
  227. Slapi_Attr attr = {0};
  228. /*
  229. * Treat cn and ou specially. Bring up the import workers for
  230. * cn and ou even though they are not DN syntax attribute.
  231. * This is done because they have some exceptional case to store
  232. * DN format in the admin entries such as UserPreferences.
  233. */
  234. if ((0 == PL_strcasecmp("cn", a->ai_type)) ||
  235. (0 == PL_strcasecmp("commonname", a->ai_type)) ||
  236. (0 == PL_strcasecmp("ou", a->ai_type)) ||
  237. (0 == PL_strcasecmp("organizationalUnit", a->ai_type))) {
  238. ;
  239. } else {
  240. slapi_attr_init(&attr, a->ai_type);
  241. rc = slapi_attr_is_dn_syntax_attr(&attr);
  242. attr_done(&attr);
  243. if (0 == rc) {
  244. return 0;
  245. }
  246. }
  247. }
  248. /* OK, so we now have hold of the attribute structure and the job info,
  249. * let's see what we have. Remember that although this function is called
  250. * many times, all these calls are in the context of a single thread, so we
  251. * don't need to worry about protecting the data in the job structure.
  252. */
  253. /* We need to specifically exclude the (entrydn, entryrdn) & parentid &
  254. * ancestorid indexes because we build those in the foreman thread.
  255. */
  256. if (IS_INDEXED(a->ai_indexmask) &&
  257. (strcasecmp(a->ai_type, LDBM_ENTRYDN_STR) != 0) &&
  258. (strcasecmp(a->ai_type, LDBM_ENTRYRDN_STR) != 0) &&
  259. (strcasecmp(a->ai_type, LDBM_PARENTID_STR) != 0) &&
  260. (strcasecmp(a->ai_type, LDBM_ANCESTORID_STR) != 0) &&
  261. (strcasecmp(a->ai_type, numsubordinates) != 0)) {
  262. /* Make an import_index_info structure, fill it in and insert into the
  263. * job's list */
  264. IndexInfo *info = CALLOC(IndexInfo);
  265. if (NULL == info) {
  266. /* Memory allocation error */
  267. return -1;
  268. }
  269. info->name = slapi_ch_strdup(a->ai_type);
  270. info->ai = a;
  271. if (NULL == info->name) {
  272. /* Memory allocation error */
  273. FREE(info);
  274. return -1;
  275. }
  276. info->next = job->index_list;
  277. job->index_list = info;
  278. job->number_indexers++;
  279. }
  280. return 0;
  281. }
  282. static void import_set_index_buffer_size(ImportJob *job)
  283. {
  284. IndexInfo *current_index = NULL;
  285. size_t substring_index_count = 0;
  286. size_t proposed_size = 0;
  287. /* Count the substring indexes we have */
  288. for (current_index = job->index_list; current_index != NULL;
  289. current_index = current_index->next) {
  290. if (current_index->ai->ai_indexmask & INDEX_SUB) {
  291. substring_index_count++;
  292. }
  293. }
  294. if (substring_index_count > 0) {
  295. /* Make proposed size such that if all substring indices were
  296. * reasonably full, we'd hit the target space */
  297. proposed_size = (job->job_index_buffer_size / substring_index_count) /
  298. IMPORT_INDEX_BUFFER_SIZE_CONSTANT;
  299. if (proposed_size > IMPORT_MAX_INDEX_BUFFER_SIZE) {
  300. proposed_size = IMPORT_MAX_INDEX_BUFFER_SIZE;
  301. }
  302. if (proposed_size < IMPORT_MIN_INDEX_BUFFER_SIZE) {
  303. proposed_size = 0;
  304. }
  305. }
  306. job->job_index_buffer_suggestion = proposed_size;
  307. }
  308. static void import_free_thread_data(ImportJob *job)
  309. {
  310. /* DBDB free the lists etc */
  311. ImportWorkerInfo *worker = job->worker_list;
  312. while (worker != NULL) {
  313. ImportWorkerInfo *asabird = worker;
  314. worker = worker->next;
  315. if (asabird->work_type != PRODUCER)
  316. slapi_ch_free( (void**)&asabird);
  317. }
  318. }
  319. void import_free_job(ImportJob *job)
  320. {
  321. /* DBDB free the lists etc */
  322. IndexInfo *index = job->index_list;
  323. import_free_thread_data(job);
  324. while (index != NULL) {
  325. IndexInfo *asabird = index;
  326. index = index->next;
  327. slapi_ch_free( (void**)&asabird->name);
  328. slapi_ch_free( (void**)&asabird);
  329. }
  330. job->index_list = NULL;
  331. if (NULL != job->mothers) {
  332. import_subcount_stuff_term(job->mothers);
  333. slapi_ch_free( (void**)&job->mothers);
  334. }
  335. ldbm_back_free_incl_excl(job->include_subtrees, job->exclude_subtrees);
  336. charray_free(job->input_filenames);
  337. if (job->fifo.size)
  338. import_fifo_destroy(job);
  339. if (NULL != job->uuid_namespace)
  340. slapi_ch_free((void **)&job->uuid_namespace);
  341. if (job->wire_lock)
  342. PR_DestroyLock(job->wire_lock);
  343. if (job->wire_cv)
  344. PR_DestroyCondVar(job->wire_cv);
  345. slapi_ch_free((void **)&job->task_status);
  346. }
  347. /* determine if we are the correct backend for this entry
  348. * (in a distributed suffix, some entries may be for other backends).
  349. * if the entry's dn actually matches one of the suffixes of the be, we
  350. * automatically take it as a belonging one, for such entries must be
  351. * present in EVERY backend independently of the distribution applied.
  352. */
  353. int import_entry_belongs_here(Slapi_Entry *e, backend *be)
  354. {
  355. Slapi_Backend *retbe;
  356. Slapi_DN *sdn = slapi_entry_get_sdn(e);
  357. if (slapi_be_issuffix(be, sdn))
  358. return 1;
  359. retbe = slapi_mapping_tree_find_backend_for_sdn(sdn);
  360. return (retbe == be);
  361. }
  362. /********** starting threads and stuff **********/
  363. /* Solaris is weird---we need an LWP per thread but NSPR doesn't give us
  364. * one unless we make this magic belshe-call */
  365. /* Fixed on Solaris 8; NSPR supports PR_GLOBAL_BOUND_THREAD */
  366. #define CREATE_THREAD PR_CreateThread
  367. static void import_init_worker_info(ImportWorkerInfo *info, ImportJob *job)
  368. {
  369. info->command = PAUSE;
  370. info->job = job;
  371. info->first_ID = job->first_ID;
  372. info->index_buffer_size = job->job_index_buffer_suggestion;
  373. }
  374. static int import_start_threads(ImportJob *job)
  375. {
  376. IndexInfo *current_index = NULL;
  377. ImportWorkerInfo *foreman = NULL, *worker = NULL;
  378. foreman = CALLOC(ImportWorkerInfo);
  379. if (!foreman)
  380. goto error;
  381. /* start the foreman */
  382. import_init_worker_info(foreman, job);
  383. foreman->work_type = FOREMAN;
  384. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_foreman, foreman,
  385. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  386. PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  387. PRErrorCode prerr = PR_GetError();
  388. LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import foreman thread, "
  389. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  390. prerr, slapd_pr_strerror(prerr), 0);
  391. FREE(foreman);
  392. goto error;
  393. }
  394. foreman->next = job->worker_list;
  395. job->worker_list = foreman;
  396. /* Start follower threads, if we are doing attribute indexing */
  397. current_index = job->index_list;
  398. if (job->flags & FLAG_INDEX_ATTRS) {
  399. while (current_index) {
  400. /* make a new thread info structure */
  401. worker = CALLOC(ImportWorkerInfo);
  402. if (! worker)
  403. goto error;
  404. /* fill it in */
  405. import_init_worker_info(worker, job);
  406. worker->index_info = current_index;
  407. worker->work_type = WORKER;
  408. /* Start the thread */
  409. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_worker, worker,
  410. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  411. PR_UNJOINABLE_THREAD,
  412. SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  413. PRErrorCode prerr = PR_GetError();
  414. LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import worker thread, "
  415. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  416. prerr, slapd_pr_strerror(prerr), 0);
  417. FREE(worker);
  418. goto error;
  419. }
  420. /* link it onto the job's thread list */
  421. worker->next = job->worker_list;
  422. job->worker_list = worker;
  423. current_index = current_index->next;
  424. }
  425. }
  426. return 0;
  427. error:
  428. import_log_notice(job, "Import thread creation failed.");
  429. import_log_notice(job, "Aborting all import threads...");
  430. import_abort_all(job, 1);
  431. import_log_notice(job, "Import threads aborted.");
  432. return -1;
  433. }
  434. /********** monitoring the worker threads **********/
  435. static void import_clear_progress_history(ImportJob *job)
  436. {
  437. int i = 0;
  438. for (i = 0; i < IMPORT_JOB_PROG_HISTORY_SIZE /*- 1*/; i++) {
  439. job->progress_history[i] = job->first_ID;
  440. job->progress_times[i] = job->start_time;
  441. }
  442. /* reset libdb cache stats */
  443. job->inst->inst_cache_hits = job->inst->inst_cache_misses = 0;
  444. }
  445. static double import_grok_db_stats(ldbm_instance *inst)
  446. {
  447. DB_MPOOL_STAT *mpstat = NULL;
  448. DB_MPOOL_FSTAT **mpfstat = NULL;
  449. int return_value = -1;
  450. double cache_hit_ratio = 0.0;
  451. return_value = dblayer_memp_stat_instance(inst, &mpstat, &mpfstat);
  452. if (!mpstat) {
  453. goto out;
  454. }
  455. if (0 == return_value) {
  456. unsigned long current_cache_hits = mpstat->st_cache_hit;
  457. unsigned long current_cache_misses = mpstat->st_cache_miss;
  458. if (inst->inst_cache_hits) {
  459. unsigned long hit_delta, miss_delta;
  460. hit_delta = current_cache_hits - inst->inst_cache_hits;
  461. miss_delta = current_cache_misses - inst->inst_cache_misses;
  462. if (hit_delta != 0) {
  463. cache_hit_ratio = (double)hit_delta /
  464. (double)(hit_delta + miss_delta);
  465. }
  466. }
  467. inst->inst_cache_misses = current_cache_misses;
  468. inst->inst_cache_hits = current_cache_hits;
  469. }
  470. out:
  471. if (mpstat)
  472. slapi_ch_free((void **)&mpstat);
  473. if (mpfstat) {
  474. #if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR + DB_VERSION_PATCH <= 3204
  475. /* In DB 3.2.4 and earlier, we need to free each element */
  476. DB_MPOOL_FSTAT **tfsp;
  477. for (tfsp = mpfstat; *tfsp; tfsp++)
  478. slapi_ch_free((void **)tfsp);
  479. #endif
  480. slapi_ch_free((void **)&mpfstat);
  481. }
  482. return cache_hit_ratio;
  483. }
  484. static char* import_decode_worker_state(int state)
  485. {
  486. switch (state) {
  487. case WAITING:
  488. return "W";
  489. case RUNNING:
  490. return "R";
  491. case FINISHED:
  492. return "F";
  493. case ABORTED:
  494. return "A";
  495. default:
  496. return "?";
  497. }
  498. }
  499. static void import_print_worker_status(ImportWorkerInfo *info)
  500. {
  501. char *name = (info->work_type == PRODUCER ? "Producer" :
  502. (info->work_type == FOREMAN ? "Foreman" :
  503. info->index_info->name));
  504. import_log_status_add_line(info->job,
  505. "%-25s %s%10ld %7.1f", name,
  506. import_decode_worker_state(info->state),
  507. info->last_ID_processed, info->rate);
  508. }
  509. #define IMPORT_CHUNK_TEST_HOLDOFF_TIME (5*60) /* Seconds */
  510. /* Got to be lower than this: */
  511. #define IMPORT_CHUNK_TEST_CACHE_HIT_RATIO (0.99)
  512. /* Less than half as fast as we were doing: */
  513. #define IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A (0.5)
  514. /* A lot less fast than we were doing: */
  515. #define IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B (0.1)
  516. static int import_throw_in_towel(ImportJob *job, time_t current_time,
  517. ID trailing_ID)
  518. {
  519. static int number_of_times_here = 0;
  520. /* secret -c option allows specific chunk size to be set... */
  521. if (job->merge_chunk_size != 0) {
  522. if ((0 != job->lead_ID) &&
  523. (trailing_ID > job->first_ID) &&
  524. (trailing_ID - job->first_ID > job->merge_chunk_size)) {
  525. return 1;
  526. }
  527. return 0;
  528. }
  529. /* Check stats to decide whether we're getting bogged down and should
  530. * terminate this pass.
  531. */
  532. /* Check #1 : are we more than 10 minutes into the chunk ? */
  533. if (current_time - job->start_time > IMPORT_CHUNK_TEST_HOLDOFF_TIME) {
  534. /* Check #2 : Have we slowed down considerably recently ? */
  535. if ((job->recent_progress_rate / job->average_progress_rate) <
  536. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A) {
  537. /* Check #3: Cache performing poorly---the puported reason
  538. * for the slowdown */
  539. if (job->cache_hit_ratio < IMPORT_CHUNK_TEST_CACHE_HIT_RATIO) {
  540. /* We have a winner ! */
  541. import_log_notice(job, "Decided to end this pass because "
  542. "the progress rate has dropped below "
  543. "the %.0f%% threshold.",
  544. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_A*100.0);
  545. return 1;
  546. }
  547. } else {
  548. if ((job->recent_progress_rate / job->average_progress_rate) <
  549. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B) {
  550. /* Alternative check: have we really, really slowed down,
  551. * without the test for cache overflow? */
  552. /* This is designed to catch the case where the cache has
  553. * been misconfigured too large */
  554. if (number_of_times_here > 10) {
  555. /* Got to get here ten times at least */
  556. import_log_notice(job, "Decided to end this pass "
  557. "because the progress rate "
  558. "plummeted below %.0f%%",
  559. IMPORT_CHUNK_TEST_SLOWDOWN_RATIO_B*100.0);
  560. return 1;
  561. }
  562. number_of_times_here++;
  563. }
  564. }
  565. }
  566. number_of_times_here = 0;
  567. return 0;
  568. }
  569. static void import_push_progress_history(ImportJob *job, ID current_id,
  570. time_t current_time)
  571. {
  572. int i = 0;
  573. for (i = 0; i < IMPORT_JOB_PROG_HISTORY_SIZE - 1; i++) {
  574. job->progress_history[i] = job->progress_history[i+1];
  575. job->progress_times[i] = job->progress_times[i+1];
  576. }
  577. job->progress_history[i] = current_id;
  578. job->progress_times[i] = current_time;
  579. }
  580. static void import_calc_rate(ImportWorkerInfo *info, int time_interval)
  581. {
  582. size_t ids = info->last_ID_processed - info->previous_ID_counted;
  583. double rate = (double)ids / time_interval;
  584. if ( (info->previous_ID_counted != 0) && (info->last_ID_processed != 0) ) {
  585. info->rate = rate;
  586. } else {
  587. info->rate = 0;
  588. }
  589. info->previous_ID_counted = info->last_ID_processed;
  590. }
  591. /* find the rate (ids/time) of work from a worker thread between history
  592. * marks A and B.
  593. */
  594. #define HISTORY(N) (job->progress_history[N])
  595. #define TIMES(N) (job->progress_times[N])
  596. #define PROGRESS(A, B) ((HISTORY(B) > HISTORY(A)) ? \
  597. ((double)(HISTORY(B) - HISTORY(A)) / \
  598. (double)(TIMES(B) - TIMES(A))) : \
  599. (double)0)
  600. static int import_monitor_threads(ImportJob *job, int *status)
  601. {
  602. PRIntervalTime tenthsecond = PR_MillisecondsToInterval(100);
  603. ImportWorkerInfo *current_worker = NULL;
  604. ImportWorkerInfo *producer = NULL, *foreman = NULL;
  605. int finished = 0;
  606. int giveup = 0;
  607. int count = 1; /* 1 to prevent premature status report */
  608. int producer_done = 0;
  609. const int display_interval = 200;
  610. time_t time_now = 0;
  611. time_t last_time = 0;
  612. time_t time_interval = 0;
  613. int rc = 0;
  614. int corestate = 0;
  615. for (current_worker = job->worker_list; current_worker != NULL;
  616. current_worker = current_worker->next) {
  617. current_worker->command = RUN;
  618. if (current_worker->work_type == PRODUCER)
  619. producer = current_worker;
  620. if (current_worker->work_type == FOREMAN)
  621. foreman = current_worker;
  622. }
  623. if (job->flags & FLAG_USE_FILES)
  624. PR_ASSERT(producer != NULL);
  625. PR_ASSERT(foreman != NULL);
  626. if (!foreman) {
  627. goto error_abort;
  628. }
  629. time(&last_time);
  630. job->start_time = last_time;
  631. import_clear_progress_history(job);
  632. while (!finished) {
  633. ID trailing_ID = NOID;
  634. DS_Sleep(tenthsecond);
  635. finished = 1;
  636. /* First calculate the time interval since last reported */
  637. if (0 == (count % display_interval)) {
  638. time(&time_now);
  639. time_interval = time_now - last_time;
  640. last_time = time_now;
  641. /* Now calculate our rate of progress overall for this chunk */
  642. if (time_now != job->start_time) {
  643. /* log a cute chart of the worker progress */
  644. import_log_status_start(job);
  645. import_log_status_add_line(job,
  646. "Index status for import of %s:", job->inst->inst_name);
  647. import_log_status_add_line(job,
  648. "-------Index Task-------State---Entry----Rate-");
  649. import_push_progress_history(job, foreman->last_ID_processed,
  650. time_now);
  651. job->average_progress_rate =
  652. (double)(HISTORY(IMPORT_JOB_PROG_HISTORY_SIZE-1)+1 - foreman->first_ID) /
  653. (double)(TIMES(IMPORT_JOB_PROG_HISTORY_SIZE-1) - job->start_time);
  654. job->recent_progress_rate =
  655. PROGRESS(0, IMPORT_JOB_PROG_HISTORY_SIZE-1);
  656. job->cache_hit_ratio = import_grok_db_stats(job->inst);
  657. }
  658. }
  659. for (current_worker = job->worker_list; current_worker != NULL;
  660. current_worker = current_worker->next) {
  661. /* Calculate the ID at which the slowest worker is currently
  662. * processing */
  663. if ((trailing_ID > current_worker->last_ID_processed) &&
  664. (current_worker->work_type == WORKER)) {
  665. trailing_ID = current_worker->last_ID_processed;
  666. }
  667. if (0 == (count % display_interval) && time_interval) {
  668. import_calc_rate(current_worker, time_interval);
  669. import_print_worker_status(current_worker);
  670. }
  671. corestate = current_worker->state & CORESTATE;
  672. if (current_worker->state == ABORTED) {
  673. goto error_abort;
  674. } else if ((corestate == QUIT) || (corestate == FINISHED)) {
  675. if (DN_NORM_BT == (DN_NORM_BT & current_worker->state)) {
  676. /* upgrading dn norm (both) is needed */
  677. rc = NEED_DN_NORM_BT; /* Set the RC; Don't abort now;
  678. * We have to stop other
  679. * threads */
  680. } else if (DN_NORM == (DN_NORM_BT & current_worker->state)) {
  681. /* upgrading dn norm is needed */
  682. rc = NEED_DN_NORM; /* Set the RC; Don't abort now;
  683. * We have to stop other threads
  684. */
  685. } else if (DN_NORM_SP == (DN_NORM_BT & current_worker->state)) {
  686. /* upgrading spaces in dn norm is needed */
  687. rc = NEED_DN_NORM_SP; /* Set the RC; Don't abort now;
  688. * We have to stop other
  689. * threads */
  690. }
  691. current_worker->state = corestate;
  692. } else if (current_worker->state != FINISHED) {
  693. finished = 0;
  694. }
  695. }
  696. if ((0 == (count % display_interval)) &&
  697. (job->start_time != time_now)) {
  698. char buffer[256], *p = buffer;
  699. import_log_status_done(job);
  700. p += sprintf(p, "Processed %lu entries ", (u_long)job->ready_ID);
  701. if (job->total_pass > 1)
  702. p += sprintf(p, "(pass %d) ", job->total_pass);
  703. p += sprintf(p, "-- average rate %.1f/sec, ",
  704. job->average_progress_rate);
  705. p += sprintf(p, "recent rate %.1f/sec, ",
  706. job->recent_progress_rate);
  707. p += sprintf(p, "hit ratio %.0f%%", job->cache_hit_ratio * 100.0);
  708. import_log_notice(job, "%s", buffer);
  709. }
  710. /* Then let's see if it's time to complete this import pass */
  711. if (!giveup) {
  712. giveup = import_throw_in_towel(job, time_now, trailing_ID);
  713. if (giveup) {
  714. /* If so, signal the lead thread to stop */
  715. import_log_notice(job, "Ending pass number %d ...",
  716. job->total_pass);
  717. foreman->command = STOP;
  718. while (foreman->state != FINISHED) {
  719. DS_Sleep(tenthsecond);
  720. }
  721. import_log_notice(job, "Foreman is done; waiting for "
  722. "workers to finish...");
  723. }
  724. }
  725. /* if the producer is finished, and the foreman has caught up... */
  726. if (producer) {
  727. producer_done = (producer->state == FINISHED) ||
  728. (producer->state == QUIT);
  729. } else {
  730. /* set in ldbm_back_wire_import */
  731. producer_done = (job->flags & FLAG_PRODUCER_DONE);
  732. }
  733. if (producer_done && (job->lead_ID == job->ready_ID)) {
  734. /* tell the foreman to stop if he's still working. */
  735. if (foreman->state != FINISHED)
  736. foreman->command = STOP;
  737. /* if all the workers are caught up too, we're done */
  738. if (trailing_ID == job->lead_ID)
  739. break;
  740. }
  741. /* if the foreman is done (end of pass) and the worker threads
  742. * have caught up...
  743. */
  744. if ((foreman->state == FINISHED) && (job->ready_ID == trailing_ID)) {
  745. break;
  746. }
  747. count++;
  748. }
  749. import_log_notice(job, "Workers finished; cleaning up...");
  750. /* Now tell all the workers to stop */
  751. for (current_worker = job->worker_list; current_worker != NULL;
  752. current_worker = current_worker->next) {
  753. if (current_worker->work_type != PRODUCER)
  754. current_worker->command = STOP;
  755. }
  756. /* Having done that, wait for them to say that they've stopped */
  757. for (current_worker = job->worker_list; current_worker != NULL; ) {
  758. if ((current_worker->state != FINISHED) &&
  759. (current_worker->state != ABORTED) &&
  760. (current_worker->state != QUIT) &&
  761. (current_worker->work_type != PRODUCER)) {
  762. DS_Sleep(tenthsecond); /* Only sleep if we hit a thread that is still not done */
  763. continue;
  764. } else {
  765. current_worker = current_worker->next;
  766. }
  767. }
  768. import_log_notice(job, "Workers cleaned up.");
  769. /* If we're here and giveup is true, and the primary hadn't finished
  770. * processing the input files, we need to return IMPORT_INCOMPLETE_PASS */
  771. if (giveup && (job->input_filenames || (job->flags & FLAG_ONLINE) ||
  772. (job->flags & FLAG_REINDEXING /* support multi-pass */))) {
  773. if (producer_done && (job->ready_ID == job->lead_ID)) {
  774. /* foreman caught up with the producer, and the producer is
  775. * done.
  776. */
  777. *status = IMPORT_COMPLETE_PASS;
  778. } else {
  779. *status = IMPORT_INCOMPLETE_PASS;
  780. }
  781. } else {
  782. *status = IMPORT_COMPLETE_PASS;
  783. }
  784. return rc;
  785. error_abort:
  786. return ERR_IMPORT_ABORTED;
  787. }
  788. /********** running passes **********/
  789. static int import_run_pass(ImportJob *job, int *status)
  790. {
  791. int ret = 0;
  792. /* Start the threads running */
  793. ret = import_start_threads(job);
  794. if (ret != 0) {
  795. import_log_notice(job, "Starting threads failed: %d\n", ret);
  796. goto error;
  797. }
  798. /* Monitor the threads until we're done or fail */
  799. ret = import_monitor_threads(job, status);
  800. if ((ret == ERR_IMPORT_ABORTED) || (ret == NEED_DN_NORM) ||
  801. (ret == NEED_DN_NORM_SP) || (ret == NEED_DN_NORM_BT)) {
  802. import_log_notice(job, "Thread monitoring returned: %d\n", ret);
  803. goto error;
  804. } else if (ret != 0) {
  805. import_log_notice(job, "Thread monitoring aborted: %d\n", ret);
  806. goto error;
  807. }
  808. error:
  809. return ret;
  810. }
  811. static void import_set_abort_flag_all(ImportJob *job, int wait_for_them)
  812. {
  813. ImportWorkerInfo *worker;
  814. /* tell all the worker threads to abort */
  815. job->flags |= FLAG_ABORT;
  816. /* setting of the flag in the job will be detected in the worker, foreman
  817. * threads and if there are any threads which have a sleeptime 200 msecs
  818. * = import_sleep_time; after that time, they will examine the condition
  819. * (job->flags & FLAG_ABORT) which will unblock the thread to proceed to
  820. * abort. Hence, we will sleep here for atleast 3 sec to make sure clean
  821. * up occurs */
  822. /* allow all the aborts to be processed */
  823. DS_Sleep(PR_MillisecondsToInterval(3000));
  824. if (wait_for_them) {
  825. /* Having done that, wait for them to say that they've stopped */
  826. for (worker = job->worker_list; worker != NULL; ) {
  827. DS_Sleep(PR_MillisecondsToInterval(100));
  828. if ((worker->state != FINISHED) && (worker->state != ABORTED) &&
  829. (worker->state != QUIT)){
  830. continue;
  831. } else {
  832. worker = worker->next;
  833. }
  834. }
  835. }
  836. }
  837. /* tell all the threads to abort */
  838. void import_abort_all(ImportJob *job, int wait_for_them)
  839. {
  840. ImportWorkerInfo *worker;
  841. /* tell all the worker threads to abort */
  842. job->flags |= FLAG_ABORT;
  843. for (worker = job->worker_list; worker; worker = worker->next)
  844. worker->command = ABORT;
  845. if (wait_for_them) {
  846. /* Having done that, wait for them to say that they've stopped */
  847. for (worker = job->worker_list; worker != NULL; ) {
  848. DS_Sleep(PR_MillisecondsToInterval(100));
  849. if ((worker->state != FINISHED) && (worker->state != ABORTED) &&
  850. (worker->state != QUIT)) {
  851. continue;
  852. } else {
  853. worker = worker->next;
  854. }
  855. }
  856. }
  857. }
  858. /* Helper function to make up filenames */
  859. int import_make_merge_filenames(char *directory, char *indexname, int pass,
  860. char **oldname, char **newname)
  861. {
  862. /* Filenames look like this: attributename<LDBM_FILENAME_SUFFIX>
  863. and need to be renamed to: attributename<LDBM_FILENAME_SUFFIX>.n
  864. where n is the pass number.
  865. */
  866. *oldname = slapi_ch_smprintf("%s/%s%s", directory, indexname, LDBM_FILENAME_SUFFIX);
  867. *newname = slapi_ch_smprintf("%s/%s.%d%s", directory, indexname, pass,
  868. LDBM_FILENAME_SUFFIX);
  869. if (!*oldname || !*newname) {
  870. slapi_ch_free_string(oldname);
  871. slapi_ch_free_string(newname);
  872. return -1;
  873. }
  874. return 0;
  875. }
  876. /* Task here is as follows:
  877. * First, if this is pass #1, check for the presence of a merge
  878. * directory. If it is not present, create it.
  879. * If it is present, delete all the files in it.
  880. * Then, flush the dblayer and close files.
  881. * Now create a numbered subdir of the merge directory for this pass.
  882. * Next, move the index files, except entrydn, parentid and id2entry to
  883. * the merge subdirectory. Important to move if we can, because
  884. * that can be millions of times faster than a copy.
  885. * Finally open the dblayer back up because the caller expects
  886. * us to not muck with it.
  887. */
  888. static int import_sweep_after_pass(ImportJob *job)
  889. {
  890. backend *be = job->inst->inst_be;
  891. int ret = 0;
  892. import_log_notice(job, "Sweeping files for merging later...");
  893. ret = dblayer_instance_close(be);
  894. if (0 == ret) {
  895. /* Walk the list of index jobs */
  896. ImportWorkerInfo *current_worker = NULL;
  897. for (current_worker = job->worker_list; current_worker != NULL;
  898. current_worker = current_worker->next) {
  899. /* Foreach job, rename the file to <filename>.n, where n is the
  900. * pass number */
  901. if ((current_worker->work_type != FOREMAN) &&
  902. (current_worker->work_type != PRODUCER) &&
  903. (strcasecmp(current_worker->index_info->name, LDBM_PARENTID_STR) != 0)) {
  904. char *newname = NULL;
  905. char *oldname = NULL;
  906. ret = import_make_merge_filenames(job->inst->inst_dir_name,
  907. current_worker->index_info->name, job->current_pass,
  908. &oldname, &newname);
  909. if (0 != ret) {
  910. break;
  911. }
  912. if (PR_Access(oldname, PR_ACCESS_EXISTS) == PR_SUCCESS) {
  913. ret = PR_Rename(oldname, newname);
  914. if (ret != PR_SUCCESS) {
  915. PRErrorCode prerr = PR_GetError();
  916. import_log_notice(job, "Failed to rename file \"%s\" to \"%s\", "
  917. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)",
  918. oldname, newname, prerr, slapd_pr_strerror(prerr));
  919. slapi_ch_free( (void**)&newname);
  920. slapi_ch_free( (void**)&oldname);
  921. break;
  922. }
  923. }
  924. slapi_ch_free( (void**)&newname);
  925. slapi_ch_free( (void**)&oldname);
  926. }
  927. }
  928. ret = dblayer_instance_start(be, DBLAYER_IMPORT_MODE);
  929. }
  930. if (0 == ret) {
  931. import_log_notice(job, "Sweep done.");
  932. } else {
  933. if (ENOSPC == ret) {
  934. import_log_notice(job, "ERROR: NO DISK SPACE LEFT in sweep phase");
  935. } else {
  936. import_log_notice(job, "ERROR: Sweep phase error %d (%s)", ret,
  937. dblayer_strerror(ret));
  938. }
  939. }
  940. return ret;
  941. }
  942. /* when the import is done, this function is called to bring stuff back up.
  943. * returns 0 on success; anything else is an error
  944. */
  945. static int import_all_done(ImportJob *job, int ret)
  946. {
  947. ldbm_instance *inst = job->inst;
  948. /* Writing this file indicates to future server startups that
  949. * the db is OK unless it's in the dry run mode. */
  950. if ((ret == 0) && !(job->flags & FLAG_DRYRUN)) {
  951. char inst_dir[MAXPATHLEN*2];
  952. char *inst_dirp = NULL;
  953. inst_dirp = dblayer_get_full_inst_dir(inst->inst_li, inst,
  954. inst_dir, MAXPATHLEN*2);
  955. ret = dbversion_write(inst->inst_li, inst_dirp, NULL, DBVERSION_ALL);
  956. if (inst_dirp != inst_dir)
  957. slapi_ch_free_string(&inst_dirp);
  958. }
  959. if ((job->task != NULL) && (0 == slapi_task_get_refcount(job->task))) {
  960. slapi_task_finish(job->task, ret);
  961. }
  962. if (job->flags & FLAG_ONLINE) {
  963. /* make sure the indexes are online as well */
  964. /* richm 20070919 - if index entries are added online, they
  965. are created and marked as INDEX_OFFLINE, in anticipation
  966. of someone doing a db2index. In this case, the db2index
  967. code will correctly unset the INDEX_OFFLINE flag.
  968. However, if import is used to create the indexes, the
  969. INDEX_OFFLINE flag will not be cleared. So, we do that
  970. here
  971. */
  972. IndexInfo *index = job->index_list;
  973. while (index != NULL) {
  974. index->ai->ai_indexmask &= ~INDEX_OFFLINE;
  975. index = index->next;
  976. }
  977. /* start up the instance */
  978. ret = dblayer_instance_start(job->inst->inst_be, DBLAYER_NORMAL_MODE);
  979. if (ret != 0)
  980. return ret;
  981. /* Reset USN slapi_counter with the last key of the entryUSN index */
  982. ldbm_set_last_usn(inst->inst_be);
  983. /* bring backend online again */
  984. slapi_mtn_be_enable(inst->inst_be);
  985. }
  986. return ret;
  987. }
  988. int import_main_offline(void *arg)
  989. {
  990. ImportJob *job = (ImportJob *)arg;
  991. ldbm_instance *inst = job->inst;
  992. backend *be = inst->inst_be;
  993. int ret = 0;
  994. time_t beginning = 0;
  995. time_t end = 0;
  996. int finished = 0;
  997. int status = 0;
  998. int verbose = 1;
  999. int aborted = 0;
  1000. ImportWorkerInfo *producer = NULL;
  1001. char *opstr = "Import";
  1002. if (job->task)
  1003. slapi_task_inc_refcount(job->task);
  1004. if (job->flags & (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1)) {
  1005. if (job->flags & FLAG_DRYRUN) {
  1006. opstr = "Upgrade Dn Dryrun";
  1007. } else if ((job->flags & (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1))
  1008. == (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1)) {
  1009. opstr = "Upgrade Dn (Full)";
  1010. } else if (job->flags & FLAG_UPGRADEDNFORMAT_V1) {
  1011. opstr = "Upgrade Dn (Spaces)";
  1012. } else {
  1013. opstr = "Upgrade Dn (RFC 4514)";
  1014. }
  1015. } else if (job->flags & FLAG_REINDEXING) {
  1016. opstr = "Reindexing";
  1017. }
  1018. PR_ASSERT(inst != NULL);
  1019. time(&beginning);
  1020. /* Decide which indexes are needed */
  1021. if (job->flags & FLAG_INDEX_ATTRS) {
  1022. /* Here, we get an AVL tree which contains nodes for all attributes
  1023. * in the schema. Given this tree, we need to identify those nodes
  1024. * which are marked for indexing. */
  1025. avl_apply(job->inst->inst_attrs, (IFP)import_attr_callback,
  1026. (caddr_t)job, -1, AVL_INORDER);
  1027. vlv_getindices((IFP)import_attr_callback, (void *)job, be);
  1028. }
  1029. /* Determine how much index buffering space to allocate to each index */
  1030. import_set_index_buffer_size(job);
  1031. /* initialize the entry FIFO */
  1032. ret = import_fifo_init(job);
  1033. if (ret) {
  1034. if (! (job->flags & FLAG_USE_FILES)) {
  1035. PR_Lock(job->wire_lock);
  1036. PR_NotifyCondVar(job->wire_cv);
  1037. PR_Unlock(job->wire_lock);
  1038. }
  1039. goto error;
  1040. }
  1041. if (job->flags & FLAG_USE_FILES) {
  1042. /* importing from files: start up a producer thread to read the
  1043. * files and queue them
  1044. */
  1045. producer = CALLOC(ImportWorkerInfo);
  1046. if (! producer)
  1047. goto error;
  1048. /* start the producer */
  1049. import_init_worker_info(producer, job);
  1050. producer->work_type = PRODUCER;
  1051. if (job->flags & (FLAG_UPGRADEDNFORMAT|FLAG_UPGRADEDNFORMAT_V1))
  1052. {
  1053. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)upgradedn_producer,
  1054. producer, PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  1055. PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  1056. PRErrorCode prerr = PR_GetError();
  1057. LDAPDebug(LDAP_DEBUG_ANY,
  1058. "unable to spawn upgrade dn producer thread, "
  1059. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  1060. prerr, slapd_pr_strerror(prerr), 0);
  1061. goto error;
  1062. }
  1063. }
  1064. else if (job->flags & FLAG_REINDEXING)
  1065. {
  1066. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)index_producer, producer,
  1067. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  1068. PR_UNJOINABLE_THREAD,
  1069. SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  1070. PRErrorCode prerr = PR_GetError();
  1071. LDAPDebug(LDAP_DEBUG_ANY,
  1072. "unable to spawn index producer thread, "
  1073. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  1074. prerr, slapd_pr_strerror(prerr), 0);
  1075. goto error;
  1076. }
  1077. }
  1078. else
  1079. {
  1080. import_log_notice(job, "Beginning import job...");
  1081. if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_producer, producer,
  1082. PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
  1083. PR_UNJOINABLE_THREAD,
  1084. SLAPD_DEFAULT_THREAD_STACKSIZE)) {
  1085. PRErrorCode prerr = PR_GetError();
  1086. LDAPDebug(LDAP_DEBUG_ANY,
  1087. "unable to spawn import producer thread, "
  1088. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  1089. prerr, slapd_pr_strerror(prerr), 0);
  1090. goto error;
  1091. }
  1092. }
  1093. if (0 == job->job_index_buffer_suggestion)
  1094. import_log_notice(job, "Index buffering is disabled.");
  1095. else
  1096. import_log_notice(job,
  1097. "Index buffering enabled with bucket size %lu",
  1098. (long unsigned int)job->job_index_buffer_suggestion);
  1099. job->worker_list = producer;
  1100. } else {
  1101. /* release the startup lock and let the entries start queueing up
  1102. * in for import */
  1103. PR_Lock(job->wire_lock);
  1104. PR_NotifyCondVar(job->wire_cv);
  1105. PR_Unlock(job->wire_lock);
  1106. }
  1107. /* Run as many passes as we need to complete the job or die honourably in
  1108. * the attempt */
  1109. while (! finished) {
  1110. job->current_pass++;
  1111. job->total_pass++;
  1112. ret = import_run_pass(job, &status);
  1113. /* The following could have happened:
  1114. * (a) Some error happened such that we're hosed.
  1115. * This is indicated by a non-zero return code.
  1116. * (b) We finished the complete file without needing a second pass
  1117. * This is indicated by a zero return code and a status of
  1118. * IMPORT_COMPLETE_PASS and current_pass == 1;
  1119. * (c) We completed a pass and need at least another one
  1120. * This is indicated by a zero return code and a status of
  1121. * IMPORT_INCOMPLETE_PASS
  1122. * (d) We just completed what turned out to be the last in a
  1123. * series of passes
  1124. * This is indicated by a zero return code and a status of
  1125. * IMPORT_COMPLETE_PASS and current_pass > 1
  1126. */
  1127. if (ret == ERR_IMPORT_ABORTED) {
  1128. /* at least one of the threads has aborted -- shut down ALL
  1129. * of the threads */
  1130. import_log_notice(job, "Aborting all %s threads...", opstr);
  1131. /* this abort sets the abort flag on the threads and will block for
  1132. * the exit of all threads
  1133. */
  1134. import_set_abort_flag_all(job, 1);
  1135. import_log_notice(job, "%s threads aborted.", opstr);
  1136. aborted = 1;
  1137. goto error;
  1138. }
  1139. if ((ret == NEED_DN_NORM) || (ret == NEED_DN_NORM_SP) ||
  1140. (ret == NEED_DN_NORM_BT)) {
  1141. goto error;
  1142. } else if (0 != ret) {
  1143. /* Some horrible fate has befallen the import */
  1144. import_log_notice(job, "Fatal pass error %d", ret);
  1145. goto error;
  1146. }
  1147. /* No error, but a number of possibilities */
  1148. if ( IMPORT_COMPLETE_PASS == status ) {
  1149. if (1 == job->current_pass) {
  1150. /* We're done !!!! */ ;
  1151. } else {
  1152. /* Save the files, then merge */
  1153. ret = import_sweep_after_pass(job);
  1154. if (0 != ret) {
  1155. goto error;
  1156. }
  1157. ret = import_mega_merge(job);
  1158. if (0 != ret) {
  1159. goto error;
  1160. }
  1161. }
  1162. finished = 1;
  1163. } else {
  1164. if (IMPORT_INCOMPLETE_PASS == status) {
  1165. /* Need to go round again */
  1166. /* Time to save the files we've built for later */
  1167. ret = import_sweep_after_pass(job);
  1168. if (0 != ret) {
  1169. goto error;
  1170. }
  1171. if ( (inst->inst_li->li_maxpassbeforemerge != 0) &&
  1172. (job->current_pass > inst->inst_li->li_maxpassbeforemerge) )
  1173. {
  1174. ret = import_mega_merge(job);
  1175. if (0 != ret) {
  1176. goto error;
  1177. }
  1178. job->current_pass = 1;
  1179. ret = import_sweep_after_pass(job);
  1180. if (0 != ret) {
  1181. goto error;
  1182. }
  1183. }
  1184. /* Fixup the first_ID value to reflect previous work */
  1185. job->first_ID = job->ready_ID + 1;
  1186. import_free_thread_data(job);
  1187. job->worker_list = producer;
  1188. import_log_notice(job, "Beginning pass number %d",
  1189. job->total_pass+1);
  1190. } else {
  1191. /* Bizarro-slapd */
  1192. goto error;
  1193. }
  1194. }
  1195. }
  1196. /* kill the producer now; we're done */
  1197. if (producer) {
  1198. import_log_notice(job, "Cleaning up producer thread...");
  1199. producer->command = STOP;
  1200. /* wait for the lead thread to stop */
  1201. while (producer->state != FINISHED) {
  1202. DS_Sleep(PR_MillisecondsToInterval(100));
  1203. }
  1204. }
  1205. import_log_notice(job, "Indexing complete. Post-processing...");
  1206. /* Now do the numsubordinates attribute */
  1207. /* [610066] reindexed db cannot be used in the following backup/restore */
  1208. if ( (!(job->flags & FLAG_REINDEXING) || (job->flags & FLAG_DN2RDN)) &&
  1209. (ret = update_subordinatecounts(be, job->mothers, job->encrypt, NULL))
  1210. != 0 ) {
  1211. import_log_notice(job, "Failed to update numsubordinates attributes");
  1212. goto error;
  1213. }
  1214. import_log_notice(job, "Generating numSubordinates complete.");
  1215. if (!entryrdn_get_noancestorid()) {
  1216. /* And the ancestorid index */
  1217. /* Creating ancestorid from the scratch; delete the index file first. */
  1218. struct attrinfo *ai = NULL;
  1219. ainfo_get(be, "ancestorid", &ai);
  1220. dblayer_erase_index_file(be, ai, 0);
  1221. if ((ret = ldbm_ancestorid_create_index(be)) != 0) {
  1222. import_log_notice(job, "Failed to create ancestorid index");
  1223. goto error;
  1224. }
  1225. }
  1226. import_log_notice(job, "Flushing caches...");
  1227. if (0 != (ret = dblayer_flush(job->inst->inst_li)) ) {
  1228. import_log_notice(job, "Failed to flush database");
  1229. goto error;
  1230. }
  1231. /* New way to exit the routine: check the return code.
  1232. * If it's non-zero, delete the database files.
  1233. * Otherwise don't, but always close the database layer properly.
  1234. * Then return. This ensures that we can't make a half-good/half-bad
  1235. * Database. */
  1236. error:
  1237. /* If we fail, the database is now in a mess, so we delete it
  1238. except dry run mode */
  1239. import_log_notice(job, "Closing files...");
  1240. cache_clear(&job->inst->inst_cache, CACHE_TYPE_ENTRY);
  1241. if (entryrdn_get_switch()) {
  1242. cache_clear(&job->inst->inst_dncache, CACHE_TYPE_DN);
  1243. }
  1244. if (aborted) {
  1245. /* If aborted, it's safer to rebuild the caches. */
  1246. cache_destroy_please(&job->inst->inst_cache, CACHE_TYPE_ENTRY);
  1247. if (entryrdn_get_switch()) { /* subtree-rename: on */
  1248. cache_destroy_please(&job->inst->inst_dncache, CACHE_TYPE_DN);
  1249. }
  1250. /* initialize the entry cache */
  1251. if (! cache_init(&(inst->inst_cache), DEFAULT_CACHE_SIZE,
  1252. DEFAULT_CACHE_ENTRIES, CACHE_TYPE_ENTRY)) {
  1253. LDAPDebug0Args(LDAP_DEBUG_ANY, "import_main_offline: "
  1254. "cache_init failed. Server should be restarted.\n");
  1255. }
  1256. /* initialize the dn cache */
  1257. if (! cache_init(&(inst->inst_dncache), DEFAULT_DNCACHE_SIZE,
  1258. DEFAULT_DNCACHE_MAXCOUNT, CACHE_TYPE_DN)) {
  1259. LDAPDebug0Args(LDAP_DEBUG_ANY, "import_main_offline: "
  1260. "dn cache_init failed. Server should be restarted.\n");
  1261. }
  1262. }
  1263. if (0 != ret) {
  1264. if (!(job->flags & (FLAG_DRYRUN|FLAG_UPGRADEDNFORMAT_V1))) {
  1265. /* If not dryrun NOR upgradedn space */
  1266. /* if running in the dry run mode, don't touch the db */
  1267. dblayer_delete_instance_dir(be);
  1268. }
  1269. dblayer_instance_close(job->inst->inst_be);
  1270. } else {
  1271. if (0 != (ret = dblayer_instance_close(job->inst->inst_be)) ) {
  1272. import_log_notice(job, "Failed to close database");
  1273. }
  1274. }
  1275. if (!(job->flags & FLAG_ONLINE))
  1276. dblayer_close(job->inst->inst_li, DBLAYER_IMPORT_MODE);
  1277. time(&end);
  1278. if (verbose && (0 == ret)) {
  1279. int seconds_to_import = end - beginning;
  1280. size_t entries_processed = job->lead_ID - (job->starting_ID - 1);
  1281. double entries_per_second =
  1282. seconds_to_import ?
  1283. (double)entries_processed / (double)seconds_to_import : 0;
  1284. if (job->not_here_skipped) {
  1285. if (job->skipped) {
  1286. import_log_notice(job,
  1287. "%s complete. Processed %lu entries "
  1288. "(%d bad entries were skipped, "
  1289. "%d entries were skipped because they don't "
  1290. "belong to this database) in %d seconds. "
  1291. "(%.2f entries/sec)",
  1292. opstr, (long unsigned int)entries_processed,
  1293. job->skipped, job->not_here_skipped,
  1294. seconds_to_import, entries_per_second);
  1295. } else {
  1296. import_log_notice(job,
  1297. "%s complete. Processed %lu entries "
  1298. "(%d entries were skipped because they don't "
  1299. "belong to this database) "
  1300. "in %d seconds. (%.2f entries/sec)",
  1301. opstr, (long unsigned int)entries_processed,
  1302. job->not_here_skipped, seconds_to_import,
  1303. entries_per_second);
  1304. }
  1305. } else {
  1306. if (job->skipped) {
  1307. import_log_notice(job,
  1308. "%s complete. Processed %lu entries "
  1309. "(%d were skipped) in %d seconds. "
  1310. "(%.2f entries/sec)",
  1311. opstr, (long unsigned int)entries_processed,
  1312. job->skipped, seconds_to_import,
  1313. entries_per_second);
  1314. } else {
  1315. import_log_notice(job,
  1316. "%s complete. Processed %lu entries "
  1317. "in %d seconds. (%.2f entries/sec)",
  1318. opstr, (long unsigned int)entries_processed,
  1319. seconds_to_import, entries_per_second);
  1320. }
  1321. }
  1322. }
  1323. if (job->flags & (FLAG_DRYRUN|FLAG_UPGRADEDNFORMAT_V1)) {
  1324. if (0 == ret) {
  1325. import_log_notice(job, "%s complete. %s is up-to-date.",
  1326. opstr, job->inst->inst_name);
  1327. ret = 0;
  1328. if (job->task) {
  1329. slapi_task_dec_refcount(job->task);
  1330. }
  1331. import_all_done(job, ret);
  1332. } else if (NEED_DN_NORM_BT == ret) {
  1333. import_log_notice(job, "%s complete. %s needs upgradednformat all.",
  1334. opstr, job->inst->inst_name);
  1335. if (job->task) {
  1336. slapi_task_dec_refcount(job->task);
  1337. }
  1338. import_all_done(job, ret);
  1339. ret = 1;
  1340. } else if (NEED_DN_NORM == ret) {
  1341. import_log_notice(job, "%s complete. %s needs upgradednformat.",
  1342. opstr, job->inst->inst_name);
  1343. if (job->task) {
  1344. slapi_task_dec_refcount(job->task);
  1345. }
  1346. import_all_done(job, ret);
  1347. ret = 2;
  1348. } else if (NEED_DN_NORM_SP == ret) {
  1349. import_log_notice(job,
  1350. "%s complete. %s needs upgradednformat spaces.",
  1351. opstr, job->inst->inst_name);
  1352. if (job->task) {
  1353. slapi_task_dec_refcount(job->task);
  1354. }
  1355. import_all_done(job, ret);
  1356. ret = 3;
  1357. } else {
  1358. ret = -1;
  1359. if (job->task != NULL) {
  1360. slapi_task_finish(job->task, ret);
  1361. }
  1362. }
  1363. } else if (0 != ret) {
  1364. import_log_notice(job, "%s failed.", opstr);
  1365. if (job->task != NULL) {
  1366. slapi_task_finish(job->task, ret);
  1367. }
  1368. } else {
  1369. if (job->task) {
  1370. slapi_task_dec_refcount(job->task);
  1371. }
  1372. import_all_done(job, ret);
  1373. }
  1374. /* This instance isn't busy anymore */
  1375. instance_set_not_busy(job->inst);
  1376. import_free_job(job);
  1377. FREE(job);
  1378. if (producer)
  1379. FREE(producer);
  1380. return(ret);
  1381. }
  1382. /*
  1383. * to be called by online import using PR_CreateThread()
  1384. * offline import directly calls import_main_offline()
  1385. *
  1386. */
  1387. void import_main(void *arg)
  1388. {
  1389. import_main_offline(arg);
  1390. }
  1391. int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb)
  1392. {
  1393. backend *be = NULL;
  1394. int noattrindexes = 0;
  1395. ImportJob *job = NULL;
  1396. char **name_array = NULL;
  1397. int total_files, i;
  1398. int up_flags = 0;
  1399. PRThread *thread = NULL;
  1400. job = CALLOC(ImportJob);
  1401. if (job == NULL) {
  1402. LDAPDebug(LDAP_DEBUG_ANY, "not enough memory to do import job\n",
  1403. 0, 0, 0);
  1404. return -1;
  1405. }
  1406. slapi_pblock_get( pb, SLAPI_BACKEND, &be);
  1407. PR_ASSERT(NULL != be);
  1408. job->inst = (ldbm_instance *)be->be_instance_info;
  1409. slapi_pblock_get( pb, SLAPI_LDIF2DB_NOATTRINDEXES, &noattrindexes );
  1410. slapi_pblock_get( pb, SLAPI_LDIF2DB_FILE, &name_array );
  1411. slapi_pblock_get(pb, SLAPI_SEQ_TYPE, &up_flags); /* For upgrade dn and
  1412. dn2rdn */
  1413. /* the removedupvals field is blatantly overloaded here to mean
  1414. * the chunk size too. (chunk size = number of entries that should
  1415. * be imported before starting a new pass. usually for debugging.)
  1416. */
  1417. slapi_pblock_get(pb, SLAPI_LDIF2DB_REMOVEDUPVALS, &job->merge_chunk_size);
  1418. if (job->merge_chunk_size == 1)
  1419. job->merge_chunk_size = 0;
  1420. /* get list of specifically included and/or excluded subtrees from
  1421. * the front-end */
  1422. ldbm_back_fetch_incl_excl(pb, &job->include_subtrees,
  1423. &job->exclude_subtrees);
  1424. /* get cn=tasks info, if any */
  1425. slapi_pblock_get(pb, SLAPI_BACKEND_TASK, &job->task);
  1426. slapi_pblock_get(pb, SLAPI_LDIF2DB_ENCRYPT, &job->encrypt);
  1427. /* get uniqueid info */
  1428. slapi_pblock_get(pb, SLAPI_LDIF2DB_GENERATE_UNIQUEID, &job->uuid_gen_type);
  1429. if (job->uuid_gen_type == SLAPI_UNIQUEID_GENERATE_NAME_BASED) {
  1430. char *namespaceid;
  1431. slapi_pblock_get(pb, SLAPI_LDIF2DB_NAMESPACEID, &namespaceid);
  1432. job->uuid_namespace = slapi_ch_strdup(namespaceid);
  1433. }
  1434. job->flags = FLAG_USE_FILES;
  1435. if (NULL == name_array) { /* no ldif file is given -> reindexing or
  1436. upgradedn */
  1437. if (up_flags & (SLAPI_UPGRADEDNFORMAT|SLAPI_UPGRADEDNFORMAT_V1)) {
  1438. if (up_flags & SLAPI_UPGRADEDNFORMAT) {
  1439. job->flags |= FLAG_UPGRADEDNFORMAT;
  1440. }
  1441. if (up_flags & SLAPI_UPGRADEDNFORMAT_V1) {
  1442. job->flags |= FLAG_UPGRADEDNFORMAT_V1;
  1443. }
  1444. if (up_flags & SLAPI_DRYRUN) {
  1445. job->flags |= FLAG_DRYRUN;
  1446. }
  1447. } else {
  1448. job->flags |= FLAG_REINDEXING; /* call index_producer */
  1449. if (up_flags & SLAPI_UPGRADEDB_DN2RDN) {
  1450. if (entryrdn_get_switch()) {
  1451. job->flags |= FLAG_DN2RDN; /* migrate to the rdn format */
  1452. } else {
  1453. LDAPDebug1Arg(LDAP_DEBUG_ANY,
  1454. "DN to RDN option is specified, "
  1455. "but %s is not enabled\n",
  1456. CONFIG_ENTRYRDN_SWITCH);
  1457. import_free_job(job);
  1458. FREE(job);
  1459. return -1;
  1460. }
  1461. }
  1462. }
  1463. }
  1464. if (!noattrindexes) {
  1465. job->flags |= FLAG_INDEX_ATTRS;
  1466. }
  1467. for (i = 0; name_array && name_array[i] != NULL; i++) {
  1468. charray_add(&job->input_filenames, slapi_ch_strdup(name_array[i]));
  1469. }
  1470. job->starting_ID = 1;
  1471. job->first_ID = 1;
  1472. job->mothers = CALLOC(import_subcount_stuff);
  1473. /* how much space should we allocate to index buffering? */
  1474. job->job_index_buffer_size = import_get_index_buffer_size();
  1475. if (job->job_index_buffer_size == 0) {
  1476. /* 10% of the allocated cache size + one meg */
  1477. PR_Lock(job->inst->inst_li->li_config_mutex);
  1478. job->job_index_buffer_size =
  1479. (job->inst->inst_li->li_import_cachesize/10) + (1024*1024);
  1480. PR_Unlock(job->inst->inst_li->li_config_mutex);
  1481. }
  1482. import_subcount_stuff_init(job->mothers);
  1483. if (job->task != NULL) {
  1484. /* count files, use that to track "progress" in cn=tasks */
  1485. total_files = 0;
  1486. while (name_array && name_array[total_files] != NULL)
  1487. total_files++;
  1488. /* add 1 to account for post-import cleanup (which can take a
  1489. * significant amount of time)
  1490. */
  1491. /* NGK - This should eventually be cleaned up to use the public
  1492. * task API. */
  1493. if (0 == total_files) { /* reindexing */
  1494. job->task->task_work = 2;
  1495. } else {
  1496. job->task->task_work = total_files + 1;
  1497. }
  1498. job->task->task_progress = 0;
  1499. job->task->task_state = SLAPI_TASK_RUNNING;
  1500. slapi_task_set_data(job->task, job);
  1501. slapi_task_set_destructor_fn(job->task, import_task_destroy);
  1502. slapi_task_set_cancel_fn(job->task, import_task_abort);
  1503. job->flags |= FLAG_ONLINE;
  1504. /* create thread for import_main, so we can return */
  1505. thread = PR_CreateThread(PR_USER_THREAD, import_main, (void *)job,
  1506. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  1507. PR_UNJOINABLE_THREAD,
  1508. SLAPD_DEFAULT_THREAD_STACKSIZE);
  1509. if (thread == NULL) {
  1510. PRErrorCode prerr = PR_GetError();
  1511. LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import thread, "
  1512. SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  1513. prerr, slapd_pr_strerror(prerr), 0);
  1514. import_free_job(job);
  1515. FREE(job);
  1516. return -2;
  1517. }
  1518. return 0;
  1519. }
  1520. /* old style -- do it all synchronously (THIS IS GOING AWAY SOON) */
  1521. return import_main_offline((void *)job);
  1522. }