sync_persist.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2013 Red Hat, Inc.
  3. * All rights reserved.
  4. *
  5. * License: GPL (version 3 or any later version).
  6. * See LICENSE for details.
  7. * END COPYRIGHT BLOCK **/
  8. #include "sync.h"
  9. /* Main list of established persistent synchronizaton searches */
  10. static SyncRequestList *sync_request_list = NULL;
  11. /*
  12. * Convenience macros for locking the list of persistent searches
  13. */
  14. #define SYNC_LOCK_READ() slapi_rwlock_rdlock(sync_request_list->sync_req_rwlock)
  15. #define SYNC_UNLOCK_READ() slapi_rwlock_unlock(sync_request_list->sync_req_rwlock)
  16. #define SYNC_LOCK_WRITE() slapi_rwlock_wrlock(sync_request_list->sync_req_rwlock)
  17. #define SYNC_UNLOCK_WRITE() slapi_rwlock_unlock(sync_request_list->sync_req_rwlock)
  18. /*
  19. * Convenience macro for checking if the Content Synchronization subsystem has
  20. * been initialized.
  21. */
  22. #define SYNC_IS_INITIALIZED() (sync_request_list != NULL)
  23. static int plugin_closing = 0;
  24. static PRUint64 thread_count = 0;
  25. static int sync_add_request(SyncRequest *req);
  26. static void sync_remove_request(SyncRequest *req);
  27. static SyncRequest *sync_request_alloc(void);
  28. void sync_queue_change(Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype);
  29. static void sync_send_results(void *arg);
  30. static void sync_request_wakeup_all(void);
  31. static void sync_node_free(SyncQueueNode **node);
  32. static int sync_acquire_connection(Slapi_Connection *conn);
  33. static int sync_release_connection(Slapi_PBlock *pb, Slapi_Connection *conn, Slapi_Operation *op, int release);
  34. int
  35. sync_add_persist_post_op(Slapi_PBlock *pb)
  36. {
  37. Slapi_Entry *e;
  38. if (!SYNC_IS_INITIALIZED()) {
  39. return (0);
  40. }
  41. slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
  42. sync_queue_change(e, NULL, LDAP_REQ_ADD);
  43. return (0);
  44. }
  45. int
  46. sync_del_persist_post_op(Slapi_PBlock *pb)
  47. {
  48. Slapi_Entry *e;
  49. if (!SYNC_IS_INITIALIZED()) {
  50. return (0);
  51. }
  52. slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e);
  53. sync_queue_change(e, NULL, LDAP_REQ_DELETE);
  54. return (0);
  55. }
  56. int
  57. sync_mod_persist_post_op(Slapi_PBlock *pb)
  58. {
  59. Slapi_Entry *e, *e_prev;
  60. if (!SYNC_IS_INITIALIZED()) {
  61. return (0);
  62. }
  63. slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
  64. slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e_prev);
  65. sync_queue_change(e, e_prev, LDAP_REQ_MODIFY);
  66. return (0);
  67. }
  68. int
  69. sync_modrdn_persist_post_op(Slapi_PBlock *pb)
  70. {
  71. Slapi_Entry *e, *e_prev;
  72. if (!SYNC_IS_INITIALIZED()) {
  73. return (0);
  74. }
  75. slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
  76. slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e_prev);
  77. sync_queue_change(e, e_prev, LDAP_REQ_MODRDN);
  78. return (0);
  79. }
  80. void
  81. sync_queue_change(Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype)
  82. {
  83. SyncRequest *req = NULL;
  84. SyncQueueNode *node = NULL;
  85. int matched = 0;
  86. int prev_match = 0;
  87. int cur_match = 0;
  88. if (!SYNC_IS_INITIALIZED()) {
  89. return;
  90. }
  91. if (NULL == e) {
  92. /* For now, some backends such as the chaining backend do not provide a post-op entry */
  93. return;
  94. }
  95. SYNC_LOCK_READ();
  96. for (req = sync_request_list->sync_req_head; NULL != req; req = req->req_next) {
  97. Slapi_DN *base = NULL;
  98. int scope;
  99. Slapi_Operation *op;
  100. /* Skip the nodes that have no more active operation
  101. */
  102. slapi_pblock_get(req->req_pblock, SLAPI_OPERATION, &op);
  103. if (op == NULL || slapi_op_abandoned(req->req_pblock)) {
  104. continue;
  105. }
  106. slapi_pblock_get(req->req_pblock, SLAPI_SEARCH_TARGET_SDN, &base);
  107. slapi_pblock_get(req->req_pblock, SLAPI_SEARCH_SCOPE, &scope);
  108. if (NULL == base) {
  109. base = slapi_sdn_new_dn_byref(req->req_orig_base);
  110. slapi_pblock_set(req->req_pblock, SLAPI_SEARCH_TARGET_SDN, base);
  111. }
  112. /*
  113. * See if the entry meets the scope and filter criteria.
  114. * We cannot do the acl check here as this thread
  115. * would then potentially clash with the ps_send_results()
  116. * thread on the aclpb in ps->req_pblock.
  117. * By avoiding the acl check in this thread, and leaving all the acl
  118. * checking to the ps_send_results() thread we avoid
  119. * the req_pblock contention problem.
  120. * The lesson here is "Do not give multiple threads arbitary access
  121. * to the same pblock" this kind of muti-threaded access
  122. * to the same pblock must be done carefully--there is currently no
  123. * generic satisfactory way to do this.
  124. */
  125. /* if the change is a modrdn then we need to check if the entry was
  126. * moved into scope, out of scope, or stays in scope
  127. */
  128. if (chgtype == LDAP_REQ_MODRDN || chgtype == LDAP_REQ_MODIFY)
  129. prev_match = slapi_sdn_scope_test(slapi_entry_get_sdn_const(eprev), base, scope) &&
  130. (0 == slapi_vattr_filter_test(req->req_pblock, eprev, req->req_filter, 0 /* verify_access */));
  131. cur_match = slapi_sdn_scope_test(slapi_entry_get_sdn_const(e), base, scope) &&
  132. (0 == slapi_vattr_filter_test(req->req_pblock, e, req->req_filter, 0 /* verify_access */));
  133. if (prev_match || cur_match) {
  134. SyncQueueNode *pOldtail;
  135. /* The scope and the filter match - enqueue it */
  136. matched++;
  137. node = (SyncQueueNode *)slapi_ch_calloc(1, sizeof(SyncQueueNode));
  138. if (chgtype == LDAP_REQ_MODRDN || chgtype == LDAP_REQ_MODIFY) {
  139. if (prev_match && cur_match)
  140. node->sync_chgtype = LDAP_REQ_MODIFY;
  141. else if (prev_match)
  142. node->sync_chgtype = LDAP_REQ_DELETE;
  143. else
  144. node->sync_chgtype = LDAP_REQ_ADD;
  145. } else {
  146. node->sync_chgtype = chgtype;
  147. }
  148. if (node->sync_chgtype == LDAP_REQ_DELETE && chgtype == LDAP_REQ_MODIFY) {
  149. /* use previous entry to pass the filter test in sync_send_results */
  150. node->sync_entry = slapi_entry_dup(eprev);
  151. } else {
  152. node->sync_entry = slapi_entry_dup(e);
  153. }
  154. /* Put it on the end of the list for this sync search */
  155. PR_Lock(req->req_lock);
  156. pOldtail = req->ps_eq_tail;
  157. req->ps_eq_tail = node;
  158. if (NULL == req->ps_eq_head) {
  159. req->ps_eq_head = req->ps_eq_tail;
  160. } else {
  161. pOldtail->sync_next = req->ps_eq_tail;
  162. }
  163. PR_Unlock(req->req_lock);
  164. }
  165. }
  166. SYNC_UNLOCK_READ();
  167. /* Were there any matches? */
  168. if (matched) {
  169. /* Notify update threads */
  170. sync_request_wakeup_all();
  171. slapi_log_err(SLAPI_LOG_TRACE, SYNC_PLUGIN_SUBSYSTEM, "sync_queue_change - enqueued entry "
  172. "\"%s\" on %d request listeners\n",
  173. slapi_entry_get_dn_const(e), matched);
  174. } else {
  175. slapi_log_err(SLAPI_LOG_TRACE, SYNC_PLUGIN_SUBSYSTEM, "sync_queue_change - entry "
  176. "\"%s\" not enqueued on any request search listeners\n",
  177. slapi_entry_get_dn_const(e));
  178. }
  179. }
  180. /*
  181. * Initialize the list structure which contains the list
  182. * of established content sync persistent requests
  183. */
  184. int
  185. sync_persist_initialize(int argc, char **argv)
  186. {
  187. if (!SYNC_IS_INITIALIZED()) {
  188. sync_request_list = (SyncRequestList *)slapi_ch_calloc(1, sizeof(SyncRequestList));
  189. if ((sync_request_list->sync_req_rwlock = slapi_new_rwlock()) == NULL) {
  190. slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_initialize - Cannot initialize lock structure(1).\n");
  191. return (-1);
  192. }
  193. if ((sync_request_list->sync_req_cvarlock = PR_NewLock()) == NULL) {
  194. slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_initialize - Cannot initialize lock structure(2).\n");
  195. return (-1);
  196. }
  197. if ((sync_request_list->sync_req_cvar = PR_NewCondVar(sync_request_list->sync_req_cvarlock)) == NULL) {
  198. slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_initialize - Cannot initialize condition variable.\n");
  199. return (-1);
  200. }
  201. sync_request_list->sync_req_head = NULL;
  202. sync_request_list->sync_req_cur_persist = 0;
  203. sync_request_list->sync_req_max_persist = SYNC_MAX_CONCURRENT;
  204. if (argc > 0) {
  205. /* for now the only plugin arg is the max concurrent
  206. * persistent sync searches
  207. */
  208. sync_request_list->sync_req_max_persist = sync_number2int(argv[0]);
  209. if (sync_request_list->sync_req_max_persist == -1) {
  210. sync_request_list->sync_req_max_persist = SYNC_MAX_CONCURRENT;
  211. }
  212. }
  213. plugin_closing = 0;
  214. }
  215. return (0);
  216. }
  217. /*
  218. * Add the given pblock to the list of established sync searches.
  219. * Then, start a thread to send the results to the client as they
  220. * are dispatched by add, modify, and modrdn operations.
  221. */
  222. PRThread *
  223. sync_persist_add(Slapi_PBlock *pb)
  224. {
  225. SyncRequest *req = NULL;
  226. char *base;
  227. Slapi_Filter *filter;
  228. if (SYNC_IS_INITIALIZED() && NULL != pb) {
  229. /* Create the new node */
  230. req = sync_request_alloc();
  231. slapi_pblock_get(pb, SLAPI_OPERATION, &req->req_orig_op); /* neede to access original op */
  232. req->req_pblock = sync_pblock_copy(pb);
  233. slapi_pblock_get(pb, SLAPI_ORIGINAL_TARGET_DN, &base);
  234. req->req_orig_base = slapi_ch_strdup(base);
  235. slapi_pblock_get(pb, SLAPI_SEARCH_FILTER, &filter);
  236. req->req_filter = slapi_filter_dup(filter);
  237. /* Add it to the head of the list of persistent searches */
  238. if (0 == sync_add_request(req)) {
  239. /* Start a thread to send the results */
  240. req->req_tid = PR_CreateThread(PR_USER_THREAD, sync_send_results,
  241. (void *)req, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  242. PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE);
  243. /* Checking if the thread is succesfully created and
  244. * if the thread is not created succesfully.... we send
  245. * error messages to the Log file
  246. */
  247. if (NULL == (req->req_tid)) {
  248. int prerr;
  249. prerr = PR_GetError();
  250. slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM,
  251. "sync_persist_add - Failed to create persitent thread, error %d (%s)\n",
  252. prerr, slapi_pr_strerror(prerr));
  253. /* Now remove the ps from the list so call the function ps_remove */
  254. sync_remove_request(req);
  255. PR_DestroyLock(req->req_lock);
  256. req->req_lock = NULL;
  257. slapi_ch_free((void **)&req->req_pblock);
  258. slapi_ch_free((void **)&req);
  259. } else {
  260. thread_count++;
  261. return (req->req_tid);
  262. }
  263. }
  264. }
  265. return (NULL);
  266. }
  267. int
  268. sync_persist_startup(PRThread *tid, Sync_Cookie *cookie)
  269. {
  270. SyncRequest *cur;
  271. int rc = 1;
  272. if (SYNC_IS_INITIALIZED() && NULL != tid) {
  273. SYNC_LOCK_READ();
  274. /* Find and change */
  275. cur = sync_request_list->sync_req_head;
  276. while (NULL != cur) {
  277. if (cur->req_tid == tid) {
  278. cur->req_active = PR_TRUE;
  279. cur->req_cookie = cookie;
  280. rc = 0;
  281. break;
  282. }
  283. cur = cur->req_next;
  284. }
  285. SYNC_UNLOCK_READ();
  286. }
  287. return (rc);
  288. }
  289. int
  290. sync_persist_terminate(PRThread *tid)
  291. {
  292. SyncRequest *cur;
  293. int rc = 1;
  294. if (SYNC_IS_INITIALIZED() && NULL != tid) {
  295. SYNC_LOCK_READ();
  296. /* Find and change */
  297. cur = sync_request_list->sync_req_head;
  298. while (NULL != cur) {
  299. if (cur->req_tid == tid) {
  300. cur->req_active = PR_FALSE;
  301. cur->req_complete = PR_TRUE;
  302. rc = 0;
  303. break;
  304. }
  305. cur = cur->req_next;
  306. }
  307. SYNC_UNLOCK_READ();
  308. }
  309. if (rc == 0) {
  310. sync_remove_request(cur);
  311. }
  312. return (rc);
  313. }
  314. /*
  315. * Called when stopping/disabling the plugin
  316. */
  317. int
  318. sync_persist_terminate_all()
  319. {
  320. if (SYNC_IS_INITIALIZED()) {
  321. /* signal the threads to stop */
  322. plugin_closing = 1;
  323. sync_request_wakeup_all();
  324. /* wait for all the threads to finish */
  325. while (thread_count > 0) {
  326. PR_Sleep(PR_SecondsToInterval(1));
  327. }
  328. slapi_destroy_rwlock(sync_request_list->sync_req_rwlock);
  329. PR_DestroyLock(sync_request_list->sync_req_cvarlock);
  330. PR_DestroyCondVar(sync_request_list->sync_req_cvar);
  331. slapi_ch_free((void **)&sync_request_list);
  332. }
  333. return (0);
  334. }
  335. /*
  336. * Allocate and initialize an empty Sync node.
  337. */
  338. static SyncRequest *
  339. sync_request_alloc(void)
  340. {
  341. SyncRequest *req;
  342. req = (SyncRequest *)slapi_ch_calloc(1, sizeof(SyncRequest));
  343. req->req_pblock = NULL;
  344. if ((req->req_lock = PR_NewLock()) == NULL) {
  345. slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "sync_request_alloc - Cannot initialize lock structure.\n");
  346. slapi_ch_free((void **)&req);
  347. return (NULL);
  348. }
  349. req->req_tid = (PRThread *)NULL;
  350. req->req_complete = 0;
  351. req->req_cookie = NULL;
  352. req->ps_eq_head = req->ps_eq_tail = (SyncQueueNode *)NULL;
  353. req->req_next = NULL;
  354. req->req_active = PR_FALSE;
  355. return req;
  356. }
  357. /*
  358. * Add the given persistent search to the
  359. * head of the list of persistent searches.
  360. */
  361. static int
  362. sync_add_request(SyncRequest *req)
  363. {
  364. int rc = 0;
  365. if (SYNC_IS_INITIALIZED() && NULL != req) {
  366. SYNC_LOCK_WRITE();
  367. if (sync_request_list->sync_req_cur_persist < sync_request_list->sync_req_max_persist) {
  368. sync_request_list->sync_req_cur_persist++;
  369. req->req_next = sync_request_list->sync_req_head;
  370. sync_request_list->sync_req_head = req;
  371. } else {
  372. rc = 1;
  373. }
  374. SYNC_UNLOCK_WRITE();
  375. }
  376. return (rc);
  377. }
  378. static void
  379. sync_remove_request(SyncRequest *req)
  380. {
  381. SyncRequest *cur;
  382. int removed = 0;
  383. if (SYNC_IS_INITIALIZED() && NULL != req) {
  384. SYNC_LOCK_WRITE();
  385. if (NULL == sync_request_list->sync_req_head) {
  386. /* should not happen, attempt to remove a request never added */
  387. } else if (req == sync_request_list->sync_req_head) {
  388. /* Remove from head */
  389. sync_request_list->sync_req_head = sync_request_list->sync_req_head->req_next;
  390. removed = 1;
  391. } else {
  392. /* Find and remove from list */
  393. cur = sync_request_list->sync_req_head;
  394. while (NULL != cur->req_next) {
  395. if (cur->req_next == req) {
  396. cur->req_next = cur->req_next->req_next;
  397. removed = 1;
  398. break;
  399. } else {
  400. cur = cur->req_next;
  401. }
  402. }
  403. }
  404. if (removed) {
  405. sync_request_list->sync_req_cur_persist--;
  406. }
  407. SYNC_UNLOCK_WRITE();
  408. if (!removed) {
  409. slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_remove_request - "
  410. "Attempt to remove nonexistent req\n");
  411. }
  412. }
  413. }
  414. static void
  415. sync_request_wakeup_all(void)
  416. {
  417. if (SYNC_IS_INITIALIZED()) {
  418. PR_Lock(sync_request_list->sync_req_cvarlock);
  419. PR_NotifyAllCondVar(sync_request_list->sync_req_cvar);
  420. PR_Unlock(sync_request_list->sync_req_cvarlock);
  421. }
  422. }
  423. static int
  424. sync_acquire_connection(Slapi_Connection *conn)
  425. {
  426. int rc;
  427. /* need to acquire a reference to this connection so that it will not
  428. be released or cleaned up out from under us
  429. in psearch.c it is implemented as:
  430. PR_Lock( ps->req_pblock->pb_conn->c_mutex );
  431. conn_acq_flag = connection_acquire_nolock(ps->req_pblock->pb_conn);
  432. PR_Unlock( ps->req_pblock->pb_conn->c_mutex );
  433. HOW TO DO FROM A PLUGIN
  434. - either expose the functions from the connection code in the private api
  435. and allow to link them in
  436. - or fake a connection structure
  437. struct fake_conn {
  438. void *needed1
  439. void *needed2
  440. void *pad1
  441. void *pad2
  442. void *needed3;
  443. }
  444. struct fake_conn *c = (struct fake_conn *) conn;
  445. c->needed3 ++;
  446. this would require knowledge or analysis of the connection structure,
  447. could probably be done for servers with a common history
  448. */
  449. /* use exposed slapi_connection functions */
  450. rc = slapi_connection_acquire(conn);
  451. return (rc);
  452. }
  453. static int
  454. sync_release_connection(Slapi_PBlock *pb, Slapi_Connection *conn, Slapi_Operation *op, int release)
  455. {
  456. /* see comments in sync_acquire_connection */
  457. /* using exposed connection handling functions */
  458. slapi_connection_remove_operation(pb, conn, op, release);
  459. return (0);
  460. }
  461. /*
  462. * Thread routine for sending search results to a client
  463. * which is persistently waiting for them.
  464. *
  465. * This routine will terminate when either (a) the ps_complete
  466. * flag is set, or (b) the associated operation is abandoned.
  467. * In any case, the thread won't notice until it wakes from
  468. * sleeping on the ps_list condition variable, so it needs
  469. * to be awakened.
  470. */
  471. static void
  472. sync_send_results(void *arg)
  473. {
  474. SyncRequest *req = (SyncRequest *)arg;
  475. SyncQueueNode *qnode, *qnodenext;
  476. int conn_acq_flag = 0;
  477. Slapi_Connection *conn = NULL;
  478. Slapi_Operation *op = req->req_orig_op;
  479. int rc;
  480. PRUint64 connid;
  481. int opid;
  482. slapi_pblock_get(req->req_pblock, SLAPI_CONN_ID, &connid);
  483. slapi_pblock_get(req->req_pblock, SLAPI_OPERATION_ID, &opid);
  484. slapi_pblock_get(req->req_pblock, SLAPI_CONNECTION, &conn);
  485. if (NULL == conn) {
  486. slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM,
  487. "sync_send_results - conn=%" PRIu64 " op=%d Null connection - aborted\n",
  488. connid, opid);
  489. goto done;
  490. }
  491. conn_acq_flag = sync_acquire_connection(conn);
  492. if (conn_acq_flag) {
  493. slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM,
  494. "sync_send_results - conn=%" PRIu64 " op=%d Could not acquire the connection - aborted\n",
  495. connid, opid);
  496. goto done;
  497. }
  498. PR_Lock(sync_request_list->sync_req_cvarlock);
  499. while ((conn_acq_flag == 0) && !req->req_complete && !plugin_closing) {
  500. /* Check for an abandoned operation */
  501. if (op == NULL || slapi_is_operation_abandoned(op)) {
  502. slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM,
  503. "sync_send_results - conn=%" PRIu64 " op=%d Operation no longer active - terminating\n",
  504. connid, opid);
  505. break;
  506. }
  507. if (NULL == req->ps_eq_head || !req->req_active) {
  508. /* Nothing to do yet, or the refresh phase is not yet completed */
  509. /* If an operation is abandoned, we do not get notified by the
  510. * connection code. Wake up every second to check if thread
  511. * should terminate.
  512. */
  513. PR_WaitCondVar(sync_request_list->sync_req_cvar, PR_SecondsToInterval(1));
  514. } else {
  515. /* dequeue the item */
  516. int attrsonly;
  517. char **attrs;
  518. char **noattrs = NULL;
  519. LDAPControl **ectrls = NULL;
  520. Slapi_Entry *ec;
  521. int chg_type = LDAP_SYNC_NONE;
  522. /* deque one element */
  523. PR_Lock(req->req_lock);
  524. qnode = req->ps_eq_head;
  525. req->ps_eq_head = qnode->sync_next;
  526. if (NULL == req->ps_eq_head) {
  527. req->ps_eq_tail = NULL;
  528. }
  529. PR_Unlock(req->req_lock);
  530. /* Get all the information we need to send the result */
  531. ec = qnode->sync_entry;
  532. slapi_pblock_get(req->req_pblock, SLAPI_SEARCH_ATTRS, &attrs);
  533. slapi_pblock_get(req->req_pblock, SLAPI_SEARCH_ATTRSONLY, &attrsonly);
  534. /*
  535. * Send the result. Since send_ldap_search_entry can block for
  536. * up to 30 minutes, we relinquish all locks before calling it.
  537. */
  538. PR_Unlock(sync_request_list->sync_req_cvarlock);
  539. /*
  540. * The entry is in the right scope and matches the filter
  541. * but we need to redo the filter test here to check access
  542. * controls. See the comments at the slapi_filter_test()
  543. * call in sync_persist_add().
  544. */
  545. if (slapi_vattr_filter_test(req->req_pblock, ec, req->req_filter,
  546. 1 /* verify_access */) == 0) {
  547. slapi_pblock_set(req->req_pblock, SLAPI_SEARCH_RESULT_ENTRY, ec);
  548. /* NEED TO BUILD THE CONTROL */
  549. switch (qnode->sync_chgtype) {
  550. case LDAP_REQ_ADD:
  551. chg_type = LDAP_SYNC_ADD;
  552. break;
  553. case LDAP_REQ_MODIFY:
  554. chg_type = LDAP_SYNC_MODIFY;
  555. break;
  556. case LDAP_REQ_MODRDN:
  557. chg_type = LDAP_SYNC_MODIFY;
  558. break;
  559. case LDAP_REQ_DELETE:
  560. chg_type = LDAP_SYNC_DELETE;
  561. noattrs = (char **)slapi_ch_calloc(2, sizeof(char *));
  562. noattrs[0] = slapi_ch_strdup("1.1");
  563. noattrs[1] = NULL;
  564. break;
  565. }
  566. ectrls = (LDAPControl **)slapi_ch_calloc(2, sizeof(LDAPControl *));
  567. if (req->req_cookie)
  568. sync_cookie_update(req->req_cookie, ec);
  569. sync_create_state_control(ec, &ectrls[0], chg_type, req->req_cookie);
  570. rc = slapi_send_ldap_search_entry(req->req_pblock,
  571. ec, ectrls,
  572. noattrs ? noattrs : attrs, attrsonly);
  573. if (rc) {
  574. slapi_log_err(SLAPI_LOG_CONNS, SYNC_PLUGIN_SUBSYSTEM,
  575. "sync_send_results - Error %d sending entry %s\n",
  576. rc, slapi_entry_get_dn_const(ec));
  577. }
  578. ldap_controls_free(ectrls);
  579. slapi_ch_array_free(noattrs);
  580. }
  581. PR_Lock(sync_request_list->sync_req_cvarlock);
  582. /* Deallocate our wrapper for this entry */
  583. sync_node_free(&qnode);
  584. }
  585. }
  586. PR_Unlock(sync_request_list->sync_req_cvarlock);
  587. /* indicate the end of search */
  588. sync_release_connection(req->req_pblock, conn, op, conn_acq_flag == 0);
  589. done:
  590. sync_remove_request(req);
  591. PR_DestroyLock(req->req_lock);
  592. req->req_lock = NULL;
  593. slapi_ch_free((void **)&req->req_pblock);
  594. slapi_ch_free((void **)&req->req_orig_base);
  595. slapi_filter_free(req->req_filter, 1);
  596. sync_cookie_free(&req->req_cookie);
  597. for (qnode = req->ps_eq_head; qnode; qnode = qnodenext) {
  598. qnodenext = qnode->sync_next;
  599. sync_node_free(&qnode);
  600. }
  601. slapi_ch_free((void **)&req);
  602. thread_count--;
  603. }
  604. /*
  605. * Free a sync update node (and everything it holds).
  606. */
  607. static void
  608. sync_node_free(SyncQueueNode **node)
  609. {
  610. if (node != NULL && *node != NULL) {
  611. if ((*node)->sync_entry != NULL) {
  612. slapi_entry_free((*node)->sync_entry);
  613. (*node)->sync_entry = NULL;
  614. }
  615. slapi_ch_free((void **)node);
  616. }
  617. }