1
0

sync_persist.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2013 Red Hat, Inc.
  3. * All rights reserved.
  4. *
  5. * License: GPL (version 3 or any later version).
  6. * See LICENSE for details.
  7. * END COPYRIGHT BLOCK **/
  8. #include "sync.h"
  9. /* Main list of established persistent synchronizaton searches */
  10. static SyncRequestList *sync_request_list = NULL;
  11. /*
  12. * Convenience macros for locking the list of persistent searches
  13. */
  14. #define SYNC_LOCK_READ() slapi_rwlock_rdlock(sync_request_list->sync_req_rwlock)
  15. #define SYNC_UNLOCK_READ() slapi_rwlock_unlock(sync_request_list->sync_req_rwlock)
  16. #define SYNC_LOCK_WRITE() slapi_rwlock_wrlock(sync_request_list->sync_req_rwlock)
  17. #define SYNC_UNLOCK_WRITE() slapi_rwlock_unlock(sync_request_list->sync_req_rwlock)
  18. /*
  19. * Convenience macro for checking if the Content Synchronization subsystem has
  20. * been initialized.
  21. */
  22. #define SYNC_IS_INITIALIZED() (sync_request_list != NULL)
  23. static int plugin_closing = 0;
  24. static PRUint64 thread_count = 0;
  25. static int sync_add_request( SyncRequest *req );
  26. static void sync_remove_request( SyncRequest *req );
  27. static SyncRequest *sync_request_alloc();
  28. void sync_queue_change( Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype );
  29. static void sync_send_results( void *arg );
  30. static void sync_request_wakeup_all();
  31. static void sync_node_free( SyncQueueNode **node );
  32. static int sync_acquire_connection (Slapi_Connection *conn);
  33. static int sync_release_connection (Slapi_PBlock *pb, Slapi_Connection *conn, Slapi_Operation *op, int release);
  34. int sync_add_persist_post_op(Slapi_PBlock *pb)
  35. {
  36. Slapi_Entry *e;
  37. if ( !SYNC_IS_INITIALIZED()) {
  38. return(0);
  39. }
  40. slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
  41. sync_queue_change(e, NULL, LDAP_REQ_ADD);
  42. return( 0 );
  43. }
  44. int sync_del_persist_post_op(Slapi_PBlock *pb)
  45. {
  46. Slapi_Entry *e;
  47. if ( !SYNC_IS_INITIALIZED()) {
  48. return(0);
  49. }
  50. slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e);
  51. sync_queue_change(e, NULL, LDAP_REQ_DELETE);
  52. return( 0 );
  53. }
  54. int sync_mod_persist_post_op(Slapi_PBlock *pb)
  55. {
  56. Slapi_Entry *e, *e_prev;
  57. if ( !SYNC_IS_INITIALIZED()) {
  58. return(0);
  59. }
  60. slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
  61. slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e_prev);
  62. sync_queue_change(e, e_prev, LDAP_REQ_MODIFY);
  63. return( 0 );
  64. }
  65. int sync_modrdn_persist_post_op(Slapi_PBlock *pb)
  66. {
  67. Slapi_Entry *e, *e_prev;
  68. if ( !SYNC_IS_INITIALIZED()) {
  69. return(0);
  70. }
  71. slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
  72. slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e_prev);
  73. sync_queue_change(e, e_prev, LDAP_REQ_MODRDN);
  74. return( 0 );
  75. }
  76. void
  77. sync_queue_change( Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype )
  78. {
  79. SyncRequest *req = NULL;
  80. SyncQueueNode *node = NULL;
  81. int matched = 0;
  82. int prev_match = 0;
  83. int cur_match = 0;
  84. if ( !SYNC_IS_INITIALIZED()) {
  85. return;
  86. }
  87. if ( NULL == e ) {
  88. /* For now, some backends such as the chaining backend do not provide a post-op entry */
  89. return;
  90. }
  91. SYNC_LOCK_READ();
  92. for ( req = sync_request_list->sync_req_head; NULL != req; req = req->req_next ) {
  93. Slapi_DN *base = NULL;
  94. int scope;
  95. Slapi_Operation *op;
  96. /* Skip the nodes that have no more active operation
  97. */
  98. slapi_pblock_get( req->req_pblock, SLAPI_OPERATION, &op );
  99. if ( op == NULL || slapi_op_abandoned( req->req_pblock ) ) {
  100. continue;
  101. }
  102. slapi_pblock_get( req->req_pblock, SLAPI_SEARCH_TARGET_SDN, &base );
  103. slapi_pblock_get( req->req_pblock, SLAPI_SEARCH_SCOPE, &scope );
  104. if (NULL == base) {
  105. base = slapi_sdn_new_dn_byref(req->req_orig_base);
  106. slapi_pblock_set(req->req_pblock, SLAPI_SEARCH_TARGET_SDN, base);
  107. }
  108. /*
  109. * See if the entry meets the scope and filter criteria.
  110. * We cannot do the acl check here as this thread
  111. * would then potentially clash with the ps_send_results()
  112. * thread on the aclpb in ps->req_pblock.
  113. * By avoiding the acl check in this thread, and leaving all the acl
  114. * checking to the ps_send_results() thread we avoid
  115. * the req_pblock contention problem.
  116. * The lesson here is "Do not give multiple threads arbitary access
  117. * to the same pblock" this kind of muti-threaded access
  118. * to the same pblock must be done carefully--there is currently no
  119. * generic satisfactory way to do this.
  120. */
  121. /* if the change is a modrdn then we need to check if the entry was
  122. * moved into scope, out of scope, or stays in scope
  123. */
  124. if (chgtype == LDAP_REQ_MODRDN || chgtype == LDAP_REQ_MODIFY)
  125. prev_match = slapi_sdn_scope_test( slapi_entry_get_sdn_const(eprev), base, scope ) &&
  126. ( 0 == slapi_vattr_filter_test( req->req_pblock, eprev, req->req_filter, 0 /* verify_access */ ));
  127. cur_match = slapi_sdn_scope_test( slapi_entry_get_sdn_const(e), base, scope ) &&
  128. ( 0 == slapi_vattr_filter_test( req->req_pblock, e, req->req_filter, 0 /* verify_access */ ));
  129. if (prev_match || cur_match) {
  130. SyncQueueNode *pOldtail;
  131. /* The scope and the filter match - enqueue it */
  132. matched++;
  133. node = (SyncQueueNode *)slapi_ch_calloc( 1, sizeof( SyncQueueNode ));
  134. if ( chgtype == LDAP_REQ_MODRDN || chgtype == LDAP_REQ_MODIFY) {
  135. if (prev_match && cur_match)
  136. node->sync_chgtype = LDAP_REQ_MODIFY;
  137. else if (prev_match)
  138. node->sync_chgtype = LDAP_REQ_DELETE;
  139. else
  140. node->sync_chgtype = LDAP_REQ_ADD;
  141. } else {
  142. node->sync_chgtype = chgtype;
  143. }
  144. if (node->sync_chgtype == LDAP_REQ_DELETE && chgtype == LDAP_REQ_MODIFY ) {
  145. /* use previous entry to pass the filter test in sync_send_results */
  146. node->sync_entry = slapi_entry_dup( eprev );
  147. } else {
  148. node->sync_entry = slapi_entry_dup( e );
  149. }
  150. /* Put it on the end of the list for this sync search */
  151. PR_Lock( req->req_lock );
  152. pOldtail = req->ps_eq_tail;
  153. req->ps_eq_tail = node;
  154. if ( NULL == req->ps_eq_head ) {
  155. req->ps_eq_head = req->ps_eq_tail;
  156. }
  157. else {
  158. pOldtail->sync_next = req->ps_eq_tail;
  159. }
  160. PR_Unlock( req->req_lock );
  161. }
  162. }
  163. SYNC_UNLOCK_READ();
  164. /* Were there any matches? */
  165. if ( matched ) {
  166. /* Notify update threads */
  167. sync_request_wakeup_all();
  168. slapi_log_error (SLAPI_LOG_TRACE, SYNC_PLUGIN_SUBSYSTEM, "sync search: enqueued entry "
  169. "\"%s\" on %d request listeners\n", slapi_entry_get_dn_const(e), matched );
  170. } else {
  171. slapi_log_error (SLAPI_LOG_TRACE, SYNC_PLUGIN_SUBSYSTEM, "sync search: entry "
  172. "\"%s\" not enqueued on any request search listeners\n", slapi_entry_get_dn_const(e) );
  173. }
  174. }
  175. /*
  176. * Initialize the list structure which contains the list
  177. * of established content sync persistent requests
  178. */
  179. int
  180. sync_persist_initialize (int argc, char **argv)
  181. {
  182. if ( !SYNC_IS_INITIALIZED()) {
  183. sync_request_list = (SyncRequestList *) slapi_ch_calloc( 1, sizeof( SyncRequestList ));
  184. if (( sync_request_list->sync_req_rwlock = slapi_new_rwlock()) == NULL ) {
  185. slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_initialize: cannot initialize lock structure(1). ");
  186. return( -1 );
  187. }
  188. if (( sync_request_list->sync_req_cvarlock = PR_NewLock()) == NULL ) {
  189. slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_initialize: cannot initialize lock structure(2). ");
  190. return( -1 );
  191. }
  192. if (( sync_request_list->sync_req_cvar = PR_NewCondVar( sync_request_list->sync_req_cvarlock )) == NULL ) {
  193. slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_initialize: cannot initialize condition variable. ");
  194. return( -1 );
  195. }
  196. sync_request_list->sync_req_head = NULL;
  197. sync_request_list->sync_req_cur_persist = 0;
  198. sync_request_list->sync_req_max_persist = SYNC_MAX_CONCURRENT;
  199. if (argc > 0) {
  200. /* for now the only plugin arg is the max concurrent
  201. * persistent sync searches
  202. */
  203. sync_request_list->sync_req_max_persist = sync_number2int(argv[0]);
  204. if (sync_request_list->sync_req_max_persist == -1) {
  205. sync_request_list->sync_req_max_persist = SYNC_MAX_CONCURRENT;
  206. }
  207. }
  208. plugin_closing = 0;
  209. }
  210. return (0);
  211. }
  212. /*
  213. * Add the given pblock to the list of established sync searches.
  214. * Then, start a thread to send the results to the client as they
  215. * are dispatched by add, modify, and modrdn operations.
  216. */
  217. PRThread *
  218. sync_persist_add (Slapi_PBlock *pb)
  219. {
  220. SyncRequest *req = NULL;
  221. char *base;
  222. Slapi_Filter *filter;
  223. if ( SYNC_IS_INITIALIZED() && NULL != pb ) {
  224. /* Create the new node */
  225. req = sync_request_alloc();
  226. slapi_pblock_get(pb, SLAPI_OPERATION, &req->req_orig_op); /* neede to access original op */
  227. req->req_pblock = sync_pblock_copy(pb);
  228. slapi_pblock_get(pb, SLAPI_ORIGINAL_TARGET_DN, &base);
  229. req->req_orig_base = slapi_ch_strdup(base);
  230. slapi_pblock_get( pb, SLAPI_SEARCH_FILTER, &filter );
  231. req->req_filter = slapi_filter_dup(filter);
  232. /* Add it to the head of the list of persistent searches */
  233. if ( 0 == sync_add_request( req )) {
  234. /* Start a thread to send the results */
  235. req->req_tid = PR_CreateThread( PR_USER_THREAD, sync_send_results,
  236. (void *) req, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  237. PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE );
  238. /* Checking if the thread is succesfully created and
  239. * if the thread is not created succesfully.... we send
  240. * error messages to the Log file
  241. */
  242. if(NULL == (req->req_tid)){
  243. int prerr;
  244. prerr = PR_GetError();
  245. slapi_log_error(SLAPI_LOG_FATAL, "Content Synchronization Search",
  246. "sync_persist_add function: failed to create persitent thread, error %d (%s)\n",
  247. prerr, slapi_pr_strerror(prerr));
  248. /* Now remove the ps from the list so call the function ps_remove */
  249. sync_remove_request(req);
  250. PR_DestroyLock ( req->req_lock );
  251. req->req_lock = NULL;
  252. slapi_ch_free((void **) &req->req_pblock );
  253. slapi_ch_free((void **) &req );
  254. } else {
  255. thread_count++;
  256. return( req->req_tid);
  257. }
  258. }
  259. }
  260. return( NULL);
  261. }
  262. int
  263. sync_persist_startup (PRThread *tid, Sync_Cookie *cookie)
  264. {
  265. SyncRequest *cur;
  266. int rc = 1;
  267. if ( SYNC_IS_INITIALIZED() && NULL != tid ) {
  268. SYNC_LOCK_READ();
  269. /* Find and change */
  270. cur = sync_request_list->sync_req_head;
  271. while ( NULL != cur ) {
  272. if ( cur->req_tid == tid ) {
  273. cur->req_active = PR_TRUE;
  274. cur->req_cookie = cookie;
  275. rc = 0;
  276. break;
  277. }
  278. cur = cur->req_next;
  279. }
  280. SYNC_UNLOCK_READ();
  281. }
  282. return (rc);
  283. }
  284. int
  285. sync_persist_terminate (PRThread *tid)
  286. {
  287. SyncRequest *cur;
  288. int rc = 1;
  289. if ( SYNC_IS_INITIALIZED() && NULL != tid ) {
  290. SYNC_LOCK_READ();
  291. /* Find and change */
  292. cur = sync_request_list->sync_req_head;
  293. while ( NULL != cur ) {
  294. if ( cur->req_tid == tid ) {
  295. cur->req_active = PR_FALSE;
  296. cur->req_complete = PR_TRUE;
  297. rc = 0;
  298. break;
  299. }
  300. cur = cur->req_next;
  301. }
  302. SYNC_UNLOCK_READ();
  303. }
  304. if (rc == 0) {
  305. sync_remove_request(cur);
  306. }
  307. return(rc);
  308. }
  309. /*
  310. * Called when stopping/disabling the plugin
  311. */
  312. int
  313. sync_persist_terminate_all ()
  314. {
  315. if ( SYNC_IS_INITIALIZED() ) {
  316. /* signal the threads to stop */
  317. plugin_closing = 1;
  318. sync_request_wakeup_all();
  319. /* wait for all the threads to finish */
  320. while(thread_count > 0){
  321. PR_Sleep(PR_SecondsToInterval(1));
  322. }
  323. slapi_destroy_rwlock(sync_request_list->sync_req_rwlock);
  324. PR_DestroyLock( sync_request_list->sync_req_cvarlock);
  325. PR_DestroyCondVar(sync_request_list->sync_req_cvar);
  326. slapi_ch_free((void **)&sync_request_list);
  327. }
  328. return (0);
  329. }
  330. /*
  331. * Allocate and initialize an empty Sync node.
  332. */
  333. static SyncRequest *
  334. sync_request_alloc()
  335. {
  336. SyncRequest *req;
  337. req = (SyncRequest *) slapi_ch_calloc( 1, sizeof( SyncRequest ));
  338. req->req_pblock = NULL;
  339. if (( req->req_lock = PR_NewLock()) == NULL ) {
  340. slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM, "sync_request_alloc: cannot initialize lock structure. ");
  341. slapi_ch_free((void **)&req);
  342. return( NULL );
  343. }
  344. req->req_tid = (PRThread *) NULL;
  345. req->req_complete = 0;
  346. req->req_cookie = NULL;
  347. req->ps_eq_head = req->ps_eq_tail = (SyncQueueNode *) NULL;
  348. req->req_next = NULL;
  349. req->req_active = PR_FALSE;
  350. return req;
  351. }
  352. /*
  353. * Add the given persistent search to the
  354. * head of the list of persistent searches.
  355. */
  356. static int
  357. sync_add_request( SyncRequest *req )
  358. {
  359. int rc = 0;
  360. if ( SYNC_IS_INITIALIZED() && NULL != req ) {
  361. SYNC_LOCK_WRITE();
  362. if (sync_request_list->sync_req_cur_persist < sync_request_list->sync_req_max_persist) {
  363. sync_request_list->sync_req_cur_persist++;
  364. req->req_next = sync_request_list->sync_req_head;
  365. sync_request_list->sync_req_head = req;
  366. } else {
  367. rc = 1;
  368. }
  369. SYNC_UNLOCK_WRITE();
  370. }
  371. return(rc);
  372. }
  373. static void
  374. sync_remove_request( SyncRequest *req )
  375. {
  376. SyncRequest *cur;
  377. int removed = 0;
  378. if ( SYNC_IS_INITIALIZED() && NULL != req ) {
  379. SYNC_LOCK_WRITE();
  380. if ( NULL == sync_request_list->sync_req_head ) {
  381. /* should not happen, attempt to remove a request never added */
  382. } else if ( req == sync_request_list->sync_req_head ) {
  383. /* Remove from head */
  384. sync_request_list->sync_req_head = sync_request_list->sync_req_head->req_next;
  385. removed = 1;
  386. } else {
  387. /* Find and remove from list */
  388. cur = sync_request_list->sync_req_head;
  389. while ( NULL != cur->req_next ) {
  390. if ( cur->req_next == req ) {
  391. cur->req_next = cur->req_next->req_next;
  392. removed = 1;
  393. break;
  394. } else {
  395. cur = cur->req_next;
  396. }
  397. }
  398. }
  399. if (removed) {
  400. sync_request_list->sync_req_cur_persist--;
  401. }
  402. SYNC_UNLOCK_WRITE();
  403. if (!removed) {
  404. slapi_log_error (SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "attempt to remove nonexistent req");
  405. }
  406. }
  407. }
  408. static void
  409. sync_request_wakeup_all()
  410. {
  411. if ( SYNC_IS_INITIALIZED()) {
  412. PR_Lock( sync_request_list->sync_req_cvarlock );
  413. PR_NotifyAllCondVar( sync_request_list->sync_req_cvar );
  414. PR_Unlock( sync_request_list->sync_req_cvarlock );
  415. }
  416. }
  417. static int
  418. sync_acquire_connection (Slapi_Connection *conn)
  419. {
  420. int rc;
  421. /* need to acquire a reference to this connection so that it will not
  422. be released or cleaned up out from under us
  423. in psearch.c it is implemented as:
  424. PR_Lock( ps->req_pblock->pb_conn->c_mutex );
  425. conn_acq_flag = connection_acquire_nolock(ps->req_pblock->pb_conn);
  426. PR_Unlock( ps->req_pblock->pb_conn->c_mutex );
  427. HOW TO DO FROM A PLUGIN
  428. - either expose the functions from the connection code in the private api
  429. and allow to link them in
  430. - or fake a connection structure
  431. struct fake_conn {
  432. void *needed1
  433. void *needed2
  434. void *pad1
  435. void *pad2
  436. void *needed3;
  437. }
  438. struct fake_conn *c = (struct fake_conn *) conn;
  439. c->needed3 ++;
  440. this would require knowledge or analysis of the connection structure,
  441. could probably be done for servers with a common history
  442. */
  443. /* use exposed slapi_connection functions */
  444. rc = slapi_connection_acquire(conn);
  445. return (rc);
  446. }
  447. static int
  448. sync_release_connection (Slapi_PBlock *pb, Slapi_Connection *conn, Slapi_Operation *op, int release)
  449. {
  450. /* see comments in sync_acquire_connection */
  451. /* using exposed connection handling functions */
  452. slapi_connection_remove_operation(pb, conn, op, release);
  453. return(0);
  454. }
  455. /*
  456. * Thread routine for sending search results to a client
  457. * which is persistently waiting for them.
  458. *
  459. * This routine will terminate when either (a) the ps_complete
  460. * flag is set, or (b) the associated operation is abandoned.
  461. * In any case, the thread won't notice until it wakes from
  462. * sleeping on the ps_list condition variable, so it needs
  463. * to be awakened.
  464. */
  465. static void
  466. sync_send_results( void *arg )
  467. {
  468. SyncRequest *req = (SyncRequest *)arg;
  469. SyncQueueNode *qnode, *qnodenext;
  470. int conn_acq_flag = 0;
  471. Slapi_Connection *conn = NULL;
  472. Slapi_Operation *op = req->req_orig_op;
  473. int rc;
  474. PRUint64 connid;
  475. int opid;
  476. slapi_pblock_get(req->req_pblock, SLAPI_CONN_ID, &connid);
  477. slapi_pblock_get(req->req_pblock, SLAPI_OPERATION_ID, &opid);
  478. slapi_pblock_get(req->req_pblock, SLAPI_CONNECTION, &conn);
  479. if (NULL == conn) {
  480. slapi_log_error(SLAPI_LOG_FATAL, "Content Synchronization Search",
  481. "conn=%" NSPRIu64 " op=%d Null connection - aborted\n",
  482. connid, opid);
  483. return;
  484. }
  485. conn_acq_flag = sync_acquire_connection (conn);
  486. if (conn_acq_flag) {
  487. slapi_log_error(SLAPI_LOG_FATAL, "Content Synchronization Search",
  488. "conn=%" NSPRIu64 " op=%d Could not acquire the connection - aborted\n",
  489. connid, opid);
  490. return;
  491. }
  492. PR_Lock( sync_request_list->sync_req_cvarlock );
  493. while ( (conn_acq_flag == 0) && !req->req_complete && !plugin_closing) {
  494. /* Check for an abandoned operation */
  495. if ( op == NULL || slapi_is_operation_abandoned( op ) ) {
  496. slapi_log_error(SLAPI_LOG_PLUGIN, "Content Synchronization Search",
  497. "conn=%" NSPRIu64 " op=%d Operation no longer active - terminating\n",
  498. connid, opid);
  499. break;
  500. }
  501. if ( NULL == req->ps_eq_head || !req->req_active) {
  502. /* Nothing to do yet, or the refresh phase is not yet completed */
  503. /* If an operation is abandoned, we do not get notified by the
  504. * connection code. Wake up every second to check if thread
  505. * should terminate.
  506. */
  507. PR_WaitCondVar( sync_request_list->sync_req_cvar, PR_SecondsToInterval(1) );
  508. } else {
  509. /* dequeue the item */
  510. int attrsonly;
  511. char **attrs;
  512. char **noattrs = NULL;
  513. LDAPControl **ectrls = NULL;
  514. Slapi_Entry *ec;
  515. int chg_type = LDAP_SYNC_NONE;
  516. /* deque one element */
  517. PR_Lock( req->req_lock );
  518. qnode = req->ps_eq_head;
  519. req->ps_eq_head = qnode->sync_next;
  520. if ( NULL == req->ps_eq_head ) {
  521. req->ps_eq_tail = NULL;
  522. }
  523. PR_Unlock( req->req_lock );
  524. /* Get all the information we need to send the result */
  525. ec = qnode->sync_entry;
  526. slapi_pblock_get( req->req_pblock, SLAPI_SEARCH_ATTRS, &attrs );
  527. slapi_pblock_get( req->req_pblock, SLAPI_SEARCH_ATTRSONLY, &attrsonly );
  528. /*
  529. * Send the result. Since send_ldap_search_entry can block for
  530. * up to 30 minutes, we relinquish all locks before calling it.
  531. */
  532. PR_Unlock(sync_request_list->sync_req_cvarlock);
  533. /*
  534. * The entry is in the right scope and matches the filter
  535. * but we need to redo the filter test here to check access
  536. * controls. See the comments at the slapi_filter_test()
  537. * call in sync_persist_add().
  538. */
  539. if ( slapi_vattr_filter_test( req->req_pblock, ec, req->req_filter,
  540. 1 /* verify_access */ ) == 0 ) {
  541. slapi_pblock_set( req->req_pblock, SLAPI_SEARCH_RESULT_ENTRY, ec );
  542. /* NEED TO BUILD THE CONTROL */
  543. switch (qnode->sync_chgtype){
  544. case LDAP_REQ_ADD:
  545. chg_type = LDAP_SYNC_ADD;
  546. break;
  547. case LDAP_REQ_MODIFY:
  548. chg_type = LDAP_SYNC_MODIFY;
  549. break;
  550. case LDAP_REQ_MODRDN:
  551. chg_type = LDAP_SYNC_MODIFY;
  552. break;
  553. case LDAP_REQ_DELETE:
  554. chg_type = LDAP_SYNC_DELETE;
  555. noattrs = (char **)slapi_ch_calloc(2, sizeof (char *));
  556. noattrs[0] = slapi_ch_strdup("1.1");
  557. noattrs[1] = NULL;
  558. break;
  559. }
  560. ectrls = (LDAPControl **)slapi_ch_calloc(2, sizeof (LDAPControl *));
  561. if (req->req_cookie)
  562. sync_cookie_update(req->req_cookie, ec);
  563. sync_create_state_control(ec, &ectrls[0], chg_type, req->req_cookie);
  564. rc = slapi_send_ldap_search_entry( req->req_pblock,
  565. ec, ectrls,
  566. noattrs?noattrs:attrs, attrsonly );
  567. if (rc) {
  568. slapi_log_error(SLAPI_LOG_CONNS, SYNC_PLUGIN_SUBSYSTEM,
  569. "Error %d sending entry %s\n",
  570. rc, slapi_entry_get_dn_const(ec));
  571. }
  572. ldap_controls_free(ectrls);
  573. slapi_ch_array_free(noattrs);
  574. }
  575. PR_Lock(sync_request_list->sync_req_cvarlock);
  576. /* Deallocate our wrapper for this entry */
  577. sync_node_free( &qnode );
  578. }
  579. }
  580. PR_Unlock( sync_request_list->sync_req_cvarlock );
  581. sync_remove_request( req );
  582. /* indicate the end of search */
  583. sync_release_connection(req->req_pblock, conn, op, conn_acq_flag == 0);
  584. PR_DestroyLock ( req->req_lock );
  585. req->req_lock = NULL;
  586. slapi_ch_free((void **) &req->req_pblock );
  587. slapi_ch_free((void **) &req->req_orig_base );
  588. slapi_filter_free(req->req_filter, 1);
  589. sync_cookie_free(&req->req_cookie);
  590. for ( qnode = req->ps_eq_head; qnode; qnode = qnodenext) {
  591. qnodenext = qnode->sync_next;
  592. sync_node_free( &qnode );
  593. }
  594. slapi_ch_free((void **) &req );
  595. thread_count--;
  596. }
  597. /*
  598. * Free a sync update node (and everything it holds).
  599. */
  600. static void
  601. sync_node_free( SyncQueueNode **node )
  602. {
  603. if ( node != NULL && *node != NULL ) {
  604. if ( (*node)->sync_entry != NULL ) {
  605. slapi_entry_free( (*node)->sync_entry );
  606. (*node)->sync_entry = NULL;
  607. }
  608. slapi_ch_free( (void **)node );
  609. }
  610. }