sync_persist.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725
  1. /** BEGIN COPYRIGHT BLOCK
  2. ent sync_srch_refresh_pre_op(Slapi_PBlock *pb);
  3. * This Program is free software; you can redistribute it and/or modify it under
  4. * the terms of the GNU General Public License as published by the Free Software
  5. * Foundation; version 2 of the License.
  6. *
  7. * This Program is distributed in the hope that it will be useful, but WITHOUT
  8. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  9. * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  10. *
  11. * You should have received a copy of the GNU General Public License along with
  12. * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
  13. * Place, Suite 330, Boston, MA 02111-1307 USA.
  14. *
  15. * In addition, as a special exception, Red Hat, Inc. gives You the additional
  16. * right to link the code of this Program with code not covered under the GNU
  17. * General Public License ("Non-GPL Code") and to distribute linked combinations
  18. * including the two, subject to the limitations in this paragraph. Non-GPL Code
  19. * permitted under this exception must only link to the code of this Program
  20. * through those well defined interfaces identified in the file named EXCEPTION
  21. * found in the source code files (the "Approved Interfaces"). The files of
  22. * Non-GPL Code may instantiate templates or use macros or inline functions from
  23. * the Approved Interfaces without causing the resulting work to be covered by
  24. * the GNU General Public License. Only Red Hat, Inc. may make changes or
  25. * additions to the list of Approved Interfaces. You must obey the GNU General
  26. * Public License in all respects for all of the Program code and other code used
  27. * in conjunction with the Program except the Non-GPL Code covered by this
  28. * exception. If you modify this file, you may extend this exception to your
  29. * version of the file, but you are not obligated to do so. If you do not wish to
  30. * provide this exception without modification, you must delete this exception
  31. * statement from your version and license this file solely under the GPL without
  32. * exception.
  33. *
  34. *
  35. * Copyright (C) 2013 Red Hat, Inc.
  36. * All rights reserved.
  37. * END COPYRIGHT BLOCK **/
  38. #include "sync.h"
  39. /* Main list of established persistent synchronizaton searches */
  40. static SyncRequestList *sync_request_list = NULL;
  41. /*
  42. * Convenience macros for locking the list of persistent searches
  43. */
  44. #define SYNC_LOCK_READ() slapi_rwlock_rdlock(sync_request_list->sync_req_rwlock)
  45. #define SYNC_UNLOCK_READ() slapi_rwlock_unlock(sync_request_list->sync_req_rwlock)
  46. #define SYNC_LOCK_WRITE() slapi_rwlock_wrlock(sync_request_list->sync_req_rwlock)
  47. #define SYNC_UNLOCK_WRITE() slapi_rwlock_unlock(sync_request_list->sync_req_rwlock)
  48. /*
  49. * Convenience macro for checking if the Content Synchronization subsystem has
  50. * been initialized.
  51. */
  52. #define SYNC_IS_INITIALIZED() (sync_request_list != NULL)
  53. static int plugin_closing = 0;
  54. static PRUint64 thread_count = 0;
  55. static int sync_add_request( SyncRequest *req );
  56. static void sync_remove_request( SyncRequest *req );
  57. static SyncRequest *sync_request_alloc();
  58. void sync_queue_change( Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype );
  59. static void sync_send_results( void *arg );
  60. static void sync_request_wakeup_all();
  61. static void sync_node_free( SyncQueueNode **node );
  62. static int sync_acquire_connection (Slapi_Connection *conn);
  63. static int sync_release_connection (Slapi_PBlock *pb, Slapi_Connection *conn, Slapi_Operation *op, int release);
  64. int sync_add_persist_post_op(Slapi_PBlock *pb)
  65. {
  66. Slapi_Entry *e;
  67. if ( !SYNC_IS_INITIALIZED()) {
  68. return(0);
  69. }
  70. slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
  71. sync_queue_change(e, NULL, LDAP_REQ_ADD);
  72. return( 0 );
  73. }
  74. int sync_del_persist_post_op(Slapi_PBlock *pb)
  75. {
  76. Slapi_Entry *e;
  77. if ( !SYNC_IS_INITIALIZED()) {
  78. return(0);
  79. }
  80. slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e);
  81. sync_queue_change(e, NULL, LDAP_REQ_DELETE);
  82. return( 0 );
  83. }
  84. int sync_mod_persist_post_op(Slapi_PBlock *pb)
  85. {
  86. Slapi_Entry *e, *e_prev;
  87. if ( !SYNC_IS_INITIALIZED()) {
  88. return(0);
  89. }
  90. slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
  91. slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e_prev);
  92. sync_queue_change(e, e_prev, LDAP_REQ_MODIFY);
  93. return( 0 );
  94. }
  95. int sync_modrdn_persist_post_op(Slapi_PBlock *pb)
  96. {
  97. Slapi_Entry *e, *e_prev;
  98. if ( !SYNC_IS_INITIALIZED()) {
  99. return(0);
  100. }
  101. slapi_pblock_get(pb, SLAPI_ENTRY_POST_OP, &e);
  102. slapi_pblock_get(pb, SLAPI_ENTRY_PRE_OP, &e_prev);
  103. sync_queue_change(e, e_prev, LDAP_REQ_MODRDN);
  104. return( 0 );
  105. }
  106. void
  107. sync_queue_change( Slapi_Entry *e, Slapi_Entry *eprev, ber_int_t chgtype )
  108. {
  109. SyncRequest *req = NULL;
  110. SyncQueueNode *node = NULL;
  111. int matched = 0;
  112. int prev_match = 0;
  113. int cur_match = 0;
  114. if ( !SYNC_IS_INITIALIZED()) {
  115. return;
  116. }
  117. if ( NULL == e ) {
  118. /* For now, some backends such as the chaining backend do not provide a post-op entry */
  119. return;
  120. }
  121. SYNC_LOCK_READ();
  122. for ( req = sync_request_list->sync_req_head; NULL != req; req = req->req_next ) {
  123. Slapi_DN *base = NULL;
  124. int scope;
  125. Slapi_Operation *op;
  126. /* Skip the nodes that have no more active operation
  127. */
  128. slapi_pblock_get( req->req_pblock, SLAPI_OPERATION, &op );
  129. if ( op == NULL || slapi_op_abandoned( req->req_pblock ) ) {
  130. continue;
  131. }
  132. slapi_pblock_get( req->req_pblock, SLAPI_SEARCH_TARGET_SDN, &base );
  133. slapi_pblock_get( req->req_pblock, SLAPI_SEARCH_SCOPE, &scope );
  134. if (NULL == base) {
  135. base = slapi_sdn_new_dn_byref(req->req_orig_base);
  136. slapi_pblock_set(req->req_pblock, SLAPI_SEARCH_TARGET_SDN, base);
  137. }
  138. /*
  139. * See if the entry meets the scope and filter criteria.
  140. * We cannot do the acl check here as this thread
  141. * would then potentially clash with the ps_send_results()
  142. * thread on the aclpb in ps->req_pblock.
  143. * By avoiding the acl check in this thread, and leaving all the acl
  144. * checking to the ps_send_results() thread we avoid
  145. * the req_pblock contention problem.
  146. * The lesson here is "Do not give multiple threads arbitary access
  147. * to the same pblock" this kind of muti-threaded access
  148. * to the same pblock must be done carefully--there is currently no
  149. * generic satisfactory way to do this.
  150. */
  151. /* if the change is a modrdn then we need to check if the entry was
  152. * moved into scope, out of scope, or stays in scope
  153. */
  154. if (chgtype == LDAP_REQ_MODRDN || chgtype == LDAP_REQ_MODIFY)
  155. prev_match = slapi_sdn_scope_test( slapi_entry_get_sdn_const(eprev), base, scope ) &&
  156. ( 0 == slapi_vattr_filter_test( req->req_pblock, eprev, req->req_filter, 0 /* verify_access */ ));
  157. cur_match = slapi_sdn_scope_test( slapi_entry_get_sdn_const(e), base, scope ) &&
  158. ( 0 == slapi_vattr_filter_test( req->req_pblock, e, req->req_filter, 0 /* verify_access */ ));
  159. if (prev_match || cur_match) {
  160. SyncQueueNode *pOldtail;
  161. /* The scope and the filter match - enqueue it */
  162. matched++;
  163. node = (SyncQueueNode *)slapi_ch_calloc( 1, sizeof( SyncQueueNode ));
  164. if ( chgtype == LDAP_REQ_MODRDN || chgtype == LDAP_REQ_MODIFY) {
  165. if (prev_match && cur_match)
  166. node->sync_chgtype = LDAP_REQ_MODIFY;
  167. else if (prev_match)
  168. node->sync_chgtype = LDAP_REQ_DELETE;
  169. else
  170. node->sync_chgtype = LDAP_REQ_ADD;
  171. } else {
  172. node->sync_chgtype = chgtype;
  173. }
  174. if (node->sync_chgtype == LDAP_REQ_DELETE && chgtype == LDAP_REQ_MODIFY ) {
  175. /* use previous entry to pass the filter test in sync_send_results */
  176. node->sync_entry = slapi_entry_dup( eprev );
  177. } else {
  178. node->sync_entry = slapi_entry_dup( e );
  179. }
  180. /* Put it on the end of the list for this sync search */
  181. PR_Lock( req->req_lock );
  182. pOldtail = req->ps_eq_tail;
  183. req->ps_eq_tail = node;
  184. if ( NULL == req->ps_eq_head ) {
  185. req->ps_eq_head = req->ps_eq_tail;
  186. }
  187. else {
  188. pOldtail->sync_next = req->ps_eq_tail;
  189. }
  190. PR_Unlock( req->req_lock );
  191. }
  192. }
  193. SYNC_UNLOCK_READ();
  194. /* Were there any matches? */
  195. if ( matched ) {
  196. /* Notify update threads */
  197. sync_request_wakeup_all();
  198. slapi_log_error (SLAPI_LOG_TRACE, SYNC_PLUGIN_SUBSYSTEM, "sync search: enqueued entry "
  199. "\"%s\" on %d request listeners\n", slapi_entry_get_dn_const(e), matched );
  200. } else {
  201. slapi_log_error (SLAPI_LOG_TRACE, SYNC_PLUGIN_SUBSYSTEM, "sync search: entry "
  202. "\"%s\" not enqueued on any request search listeners\n", slapi_entry_get_dn_const(e) );
  203. }
  204. }
  205. /*
  206. * Initialize the list structure which contains the list
  207. * of established content sync persistent requests
  208. */
  209. int
  210. sync_persist_initialize (int argc, char **argv)
  211. {
  212. if ( !SYNC_IS_INITIALIZED()) {
  213. sync_request_list = (SyncRequestList *) slapi_ch_calloc( 1, sizeof( SyncRequestList ));
  214. if (( sync_request_list->sync_req_rwlock = slapi_new_rwlock()) == NULL ) {
  215. slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_initialize: cannot initialize lock structure(1). ");
  216. return( -1 );
  217. }
  218. if (( sync_request_list->sync_req_cvarlock = PR_NewLock()) == NULL ) {
  219. slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_initialize: cannot initialize lock structure(2). ");
  220. return( -1 );
  221. }
  222. if (( sync_request_list->sync_req_cvar = PR_NewCondVar( sync_request_list->sync_req_cvarlock )) == NULL ) {
  223. slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_initialize: cannot initialize condition variable. ");
  224. return( -1 );
  225. }
  226. sync_request_list->sync_req_head = NULL;
  227. sync_request_list->sync_req_cur_persist = 0;
  228. sync_request_list->sync_req_max_persist = SYNC_MAX_CONCURRENT;
  229. if (argc > 0) {
  230. /* for now the only plugin arg is the max concurrent
  231. * persistent sync searches
  232. */
  233. sync_request_list->sync_req_max_persist = sync_number2int(argv[0]);
  234. if (sync_request_list->sync_req_max_persist == -1) {
  235. sync_request_list->sync_req_max_persist = SYNC_MAX_CONCURRENT;
  236. }
  237. }
  238. plugin_closing = 0;
  239. }
  240. return (0);
  241. }
  242. /*
  243. * Add the given pblock to the list of established sync searches.
  244. * Then, start a thread to send the results to the client as they
  245. * are dispatched by add, modify, and modrdn operations.
  246. */
  247. PRThread *
  248. sync_persist_add (Slapi_PBlock *pb)
  249. {
  250. SyncRequest *req = NULL;
  251. char *base;
  252. Slapi_Filter *filter;
  253. if ( SYNC_IS_INITIALIZED() && NULL != pb ) {
  254. /* Create the new node */
  255. req = sync_request_alloc();
  256. slapi_pblock_get(pb, SLAPI_OPERATION, &req->req_orig_op); /* neede to access original op */
  257. req->req_pblock = sync_pblock_copy(pb);
  258. slapi_pblock_get(pb, SLAPI_ORIGINAL_TARGET_DN, &base);
  259. req->req_orig_base = slapi_ch_strdup(base);
  260. slapi_pblock_get( pb, SLAPI_SEARCH_FILTER, &filter );
  261. req->req_filter = slapi_filter_dup(filter);
  262. /* Add it to the head of the list of persistent searches */
  263. if ( 0 == sync_add_request( req )) {
  264. /* Start a thread to send the results */
  265. req->req_tid = PR_CreateThread( PR_USER_THREAD, sync_send_results,
  266. (void *) req, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  267. PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE );
  268. /* Checking if the thread is succesfully created and
  269. * if the thread is not created succesfully.... we send
  270. * error messages to the Log file
  271. */
  272. if(NULL == (req->req_tid)){
  273. int prerr;
  274. prerr = PR_GetError();
  275. slapi_log_error(SLAPI_LOG_FATAL, "Content Synchronization Search",
  276. "sync_persist_add function: failed to create persitent thread, error %d (%s)\n",
  277. prerr, slapi_pr_strerror(prerr));
  278. /* Now remove the ps from the list so call the function ps_remove */
  279. sync_remove_request(req);
  280. PR_DestroyLock ( req->req_lock );
  281. req->req_lock = NULL;
  282. slapi_ch_free((void **) &req->req_pblock );
  283. slapi_ch_free((void **) &req );
  284. } else {
  285. thread_count++;
  286. return( req->req_tid);
  287. }
  288. }
  289. }
  290. return( NULL);
  291. }
  292. int
  293. sync_persist_startup (PRThread *tid, Sync_Cookie *cookie)
  294. {
  295. SyncRequest *cur;
  296. int rc = 1;
  297. if ( SYNC_IS_INITIALIZED() && NULL != tid ) {
  298. SYNC_LOCK_READ();
  299. /* Find and change */
  300. cur = sync_request_list->sync_req_head;
  301. while ( NULL != cur ) {
  302. if ( cur->req_tid == tid ) {
  303. cur->req_active = PR_TRUE;
  304. cur->req_cookie = cookie;
  305. rc = 0;
  306. break;
  307. }
  308. cur = cur->req_next;
  309. }
  310. SYNC_UNLOCK_READ();
  311. }
  312. return (rc);
  313. }
  314. int
  315. sync_persist_terminate (PRThread *tid)
  316. {
  317. SyncRequest *cur;
  318. int rc = 1;
  319. if ( SYNC_IS_INITIALIZED() && NULL != tid ) {
  320. SYNC_LOCK_READ();
  321. /* Find and change */
  322. cur = sync_request_list->sync_req_head;
  323. while ( NULL != cur ) {
  324. if ( cur->req_tid == tid ) {
  325. cur->req_active = PR_FALSE;
  326. cur->req_complete = PR_TRUE;
  327. rc = 0;
  328. break;
  329. }
  330. cur = cur->req_next;
  331. }
  332. SYNC_UNLOCK_READ();
  333. }
  334. if (rc == 0) {
  335. sync_remove_request(cur);
  336. }
  337. return(rc);
  338. }
  339. /*
  340. * Called when stopping/disabling the plugin
  341. */
  342. int
  343. sync_persist_terminate_all ()
  344. {
  345. if ( SYNC_IS_INITIALIZED() ) {
  346. /* signal the threads to stop */
  347. plugin_closing = 1;
  348. sync_request_wakeup_all();
  349. /* wait for all the threads to finish */
  350. while(thread_count > 0){
  351. PR_Sleep(PR_SecondsToInterval(1));
  352. }
  353. slapi_destroy_rwlock(sync_request_list->sync_req_rwlock);
  354. PR_DestroyLock( sync_request_list->sync_req_cvarlock);
  355. PR_DestroyCondVar(sync_request_list->sync_req_cvar);
  356. slapi_ch_free((void **)&sync_request_list);
  357. }
  358. return (0);
  359. }
  360. /*
  361. * Allocate and initialize an empty Sync node.
  362. */
  363. static SyncRequest *
  364. sync_request_alloc()
  365. {
  366. SyncRequest *req;
  367. req = (SyncRequest *) slapi_ch_calloc( 1, sizeof( SyncRequest ));
  368. req->req_pblock = NULL;
  369. if (( req->req_lock = PR_NewLock()) == NULL ) {
  370. slapi_log_error (SLAPI_LOG_FATAL, SYNC_PLUGIN_SUBSYSTEM, "sync_request_alloc: cannot initialize lock structure. ");
  371. slapi_ch_free((void **)&req);
  372. return( NULL );
  373. }
  374. req->req_tid = (PRThread *) NULL;
  375. req->req_complete = 0;
  376. req->req_cookie = NULL;
  377. req->ps_eq_head = req->ps_eq_tail = (SyncQueueNode *) NULL;
  378. req->req_next = NULL;
  379. req->req_active = PR_FALSE;
  380. return req;
  381. }
  382. /*
  383. * Add the given persistent search to the
  384. * head of the list of persistent searches.
  385. */
  386. static int
  387. sync_add_request( SyncRequest *req )
  388. {
  389. int rc = 0;
  390. if ( SYNC_IS_INITIALIZED() && NULL != req ) {
  391. SYNC_LOCK_WRITE();
  392. if (sync_request_list->sync_req_cur_persist < sync_request_list->sync_req_max_persist) {
  393. sync_request_list->sync_req_cur_persist++;
  394. req->req_next = sync_request_list->sync_req_head;
  395. sync_request_list->sync_req_head = req;
  396. } else {
  397. rc = 1;
  398. }
  399. SYNC_UNLOCK_WRITE();
  400. }
  401. return(rc);
  402. }
  403. static void
  404. sync_remove_request( SyncRequest *req )
  405. {
  406. SyncRequest *cur;
  407. int removed = 0;
  408. if ( SYNC_IS_INITIALIZED() && NULL != req ) {
  409. SYNC_LOCK_WRITE();
  410. if ( NULL == sync_request_list->sync_req_head ) {
  411. /* should not happen, attempt to remove a request never added */
  412. } else if ( req == sync_request_list->sync_req_head ) {
  413. /* Remove from head */
  414. sync_request_list->sync_req_head = sync_request_list->sync_req_head->req_next;
  415. removed = 1;
  416. } else {
  417. /* Find and remove from list */
  418. cur = sync_request_list->sync_req_head;
  419. while ( NULL != cur->req_next ) {
  420. if ( cur->req_next == req ) {
  421. cur->req_next = cur->req_next->req_next;
  422. removed = 1;
  423. break;
  424. } else {
  425. cur = cur->req_next;
  426. }
  427. }
  428. }
  429. if (removed) {
  430. sync_request_list->sync_req_cur_persist--;
  431. }
  432. SYNC_UNLOCK_WRITE();
  433. if (!removed) {
  434. slapi_log_error (SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "attempt to remove nonexistent req");
  435. }
  436. }
  437. }
  438. static void
  439. sync_request_wakeup_all()
  440. {
  441. if ( SYNC_IS_INITIALIZED()) {
  442. PR_Lock( sync_request_list->sync_req_cvarlock );
  443. PR_NotifyAllCondVar( sync_request_list->sync_req_cvar );
  444. PR_Unlock( sync_request_list->sync_req_cvarlock );
  445. }
  446. }
  447. static int
  448. sync_acquire_connection (Slapi_Connection *conn)
  449. {
  450. int rc;
  451. /* need to acquire a reference to this connection so that it will not
  452. be released or cleaned up out from under us
  453. in psearch.c it is implemented as:
  454. PR_Lock( ps->req_pblock->pb_conn->c_mutex );
  455. conn_acq_flag = connection_acquire_nolock(ps->req_pblock->pb_conn);
  456. PR_Unlock( ps->req_pblock->pb_conn->c_mutex );
  457. HOW TO DO FROM A PLUGIN
  458. - either expose the functions from the connection code in the private api
  459. and allow to link them in
  460. - or fake a connection structure
  461. struct fake_conn {
  462. void *needed1
  463. void *needed2
  464. void *pad1
  465. void *pad2
  466. void *needed3;
  467. }
  468. struct fake_conn *c = (struct fake_conn *) conn;
  469. c->needed3 ++;
  470. this would require knowledge or analysis of the connection structure,
  471. could probably be done for servers with a common history
  472. */
  473. /* use exposed slapi_connection functions */
  474. rc = slapi_connection_acquire(conn);
  475. return (rc);
  476. }
  477. static int
  478. sync_release_connection (Slapi_PBlock *pb, Slapi_Connection *conn, Slapi_Operation *op, int release)
  479. {
  480. /* see comments in sync_acquire_connection */
  481. /* using exposed connection handling functions */
  482. slapi_connection_remove_operation(pb, conn, op, release);
  483. return(0);
  484. }
  485. /*
  486. * Thread routine for sending search results to a client
  487. * which is persistently waiting for them.
  488. *
  489. * This routine will terminate when either (a) the ps_complete
  490. * flag is set, or (b) the associated operation is abandoned.
  491. * In any case, the thread won't notice until it wakes from
  492. * sleeping on the ps_list condition variable, so it needs
  493. * to be awakened.
  494. */
  495. static void
  496. sync_send_results( void *arg )
  497. {
  498. SyncRequest *req = (SyncRequest *)arg;
  499. SyncQueueNode *qnode, *qnodenext;
  500. int conn_acq_flag = 0;
  501. Slapi_Connection *conn = NULL;
  502. Slapi_Operation *op = req->req_orig_op;
  503. int rc;
  504. PRUint64 connid;
  505. int opid;
  506. slapi_pblock_get(req->req_pblock, SLAPI_CONN_ID, &connid);
  507. slapi_pblock_get(req->req_pblock, SLAPI_OPERATION_ID, &opid);
  508. slapi_pblock_get(req->req_pblock, SLAPI_CONNECTION, &conn);
  509. if (NULL == conn) {
  510. slapi_log_error(SLAPI_LOG_FATAL, "Content Synchronization Search",
  511. "conn=%" NSPRIu64 " op=%d Null connection - aborted\n",
  512. connid, opid);
  513. return;
  514. }
  515. conn_acq_flag = sync_acquire_connection (conn);
  516. if (conn_acq_flag) {
  517. slapi_log_error(SLAPI_LOG_FATAL, "Content Synchronization Search",
  518. "conn=%" NSPRIu64 " op=%d Could not acquire the connection - aborted\n",
  519. connid, opid);
  520. return;
  521. }
  522. PR_Lock( sync_request_list->sync_req_cvarlock );
  523. while ( (conn_acq_flag == 0) && !req->req_complete && !plugin_closing) {
  524. /* Check for an abandoned operation */
  525. if ( op == NULL || slapi_is_operation_abandoned( op ) ) {
  526. slapi_log_error(SLAPI_LOG_PLUGIN, "Content Synchronization Search",
  527. "conn=%" NSPRIu64 " op=%d Operation no longer active - terminating\n",
  528. connid, opid);
  529. break;
  530. }
  531. if ( NULL == req->ps_eq_head || !req->req_active) {
  532. /* Nothing to do yet, or the refresh phase is not yet completed */
  533. /* If an operation is abandoned, we do not get notified by the
  534. * connection code. Wake up every second to check if thread
  535. * should terminate.
  536. */
  537. PR_WaitCondVar( sync_request_list->sync_req_cvar, PR_SecondsToInterval(1) );
  538. } else {
  539. /* dequeue the item */
  540. int attrsonly;
  541. char **attrs;
  542. char **noattrs = NULL;
  543. LDAPControl **ectrls = NULL;
  544. Slapi_Entry *ec;
  545. int chg_type = LDAP_SYNC_NONE;
  546. /* deque one element */
  547. PR_Lock( req->req_lock );
  548. qnode = req->ps_eq_head;
  549. req->ps_eq_head = qnode->sync_next;
  550. if ( NULL == req->ps_eq_head ) {
  551. req->ps_eq_tail = NULL;
  552. }
  553. PR_Unlock( req->req_lock );
  554. /* Get all the information we need to send the result */
  555. ec = qnode->sync_entry;
  556. slapi_pblock_get( req->req_pblock, SLAPI_SEARCH_ATTRS, &attrs );
  557. slapi_pblock_get( req->req_pblock, SLAPI_SEARCH_ATTRSONLY, &attrsonly );
  558. /*
  559. * Send the result. Since send_ldap_search_entry can block for
  560. * up to 30 minutes, we relinquish all locks before calling it.
  561. */
  562. PR_Unlock(sync_request_list->sync_req_cvarlock);
  563. /*
  564. * The entry is in the right scope and matches the filter
  565. * but we need to redo the filter test here to check access
  566. * controls. See the comments at the slapi_filter_test()
  567. * call in sync_persist_add().
  568. */
  569. if ( slapi_vattr_filter_test( req->req_pblock, ec, req->req_filter,
  570. 1 /* verify_access */ ) == 0 ) {
  571. slapi_pblock_set( req->req_pblock, SLAPI_SEARCH_RESULT_ENTRY, ec );
  572. /* NEED TO BUILD THE CONTROL */
  573. switch (qnode->sync_chgtype){
  574. case LDAP_REQ_ADD:
  575. chg_type = LDAP_SYNC_ADD;
  576. break;
  577. case LDAP_REQ_MODIFY:
  578. chg_type = LDAP_SYNC_MODIFY;
  579. break;
  580. case LDAP_REQ_MODRDN:
  581. chg_type = LDAP_SYNC_MODIFY;
  582. break;
  583. case LDAP_REQ_DELETE:
  584. chg_type = LDAP_SYNC_DELETE;
  585. noattrs = (char **)slapi_ch_calloc(2, sizeof (char *));
  586. noattrs[0] = slapi_ch_strdup("1.1");
  587. noattrs[1] = NULL;
  588. break;
  589. }
  590. ectrls = (LDAPControl **)slapi_ch_calloc(2, sizeof (LDAPControl *));
  591. if (req->req_cookie)
  592. sync_cookie_update(req->req_cookie, ec);
  593. sync_create_state_control(ec, &ectrls[0], chg_type, req->req_cookie);
  594. rc = slapi_send_ldap_search_entry( req->req_pblock,
  595. ec, ectrls,
  596. noattrs?noattrs:attrs, attrsonly );
  597. if (rc) {
  598. slapi_log_error(SLAPI_LOG_CONNS, SYNC_PLUGIN_SUBSYSTEM,
  599. "Error %d sending entry %s\n",
  600. rc, slapi_entry_get_dn_const(ec));
  601. }
  602. ldap_controls_free(ectrls);
  603. slapi_ch_array_free(noattrs);
  604. }
  605. PR_Lock(sync_request_list->sync_req_cvarlock);
  606. /* Deallocate our wrapper for this entry */
  607. sync_node_free( &qnode );
  608. }
  609. }
  610. PR_Unlock( sync_request_list->sync_req_cvarlock );
  611. sync_remove_request( req );
  612. /* indicate the end of search */
  613. sync_release_connection(req->req_pblock, conn, op, conn_acq_flag == 0);
  614. PR_DestroyLock ( req->req_lock );
  615. req->req_lock = NULL;
  616. slapi_ch_free((void **) &req->req_pblock );
  617. slapi_ch_free((void **) &req->req_orig_base );
  618. slapi_filter_free(req->req_filter, 1);
  619. sync_cookie_free(&req->req_cookie);
  620. for ( qnode = req->ps_eq_head; qnode; qnode = qnodenext) {
  621. qnodenext = qnode->sync_next;
  622. sync_node_free( &qnode );
  623. }
  624. slapi_ch_free((void **) &req );
  625. thread_count--;
  626. }
  627. /*
  628. * Free a sync update node (and everything it holds).
  629. */
  630. static void
  631. sync_node_free( SyncQueueNode **node )
  632. {
  633. if ( node != NULL && *node != NULL ) {
  634. if ( (*node)->sync_entry != NULL ) {
  635. slapi_entry_free( (*node)->sync_entry );
  636. (*node)->sync_entry = NULL;
  637. }
  638. slapi_ch_free( (void **)node );
  639. }
  640. }