repl_extop.c 74 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2021 Red Hat, Inc.
  4. * All rights reserved.
  5. *
  6. * License: GPL (version 3 or any later version).
  7. * See LICENSE for details.
  8. * END COPYRIGHT BLOCK **/
  9. #ifdef HAVE_CONFIG_H
  10. #include <config.h>
  11. #endif
  12. #include "slapi-plugin.h"
  13. #include "repl5.h"
  14. #include "repl5_prot_private.h"
  15. #include "cl5_api.h"
  16. #define ENABLE_TEST_TICKET_374
  17. #ifdef ENABLE_TEST_TICKET_374
  18. #include <unistd.h> /* for usleep */
  19. #endif
  20. /*
  21. * repl_extop.c - there are two types of functions in this file:
  22. * - Code that implements an extended operation plugin.
  23. * The replication DLL arranges for this code to
  24. * be called when a StartNSDS50ReplicationRequest
  25. * or an EndNSDS50ReplicationRequest extended operation
  26. * is received.
  27. * - Code that sends extended operations on an already-
  28. * established client connection.
  29. *
  30. * The requestValue portion of the StartNSDS50ReplicationRequest
  31. * looks like this:
  32. *
  33. * requestValue ::= SEQUENCE {
  34. * replProtocolOID LDAPOID,
  35. * replicatedTree LDAPDN,
  36. supplierRUV OCTET STRING
  37. * referralURLs SET of LDAPURL OPTIONAL
  38. * csn OCTET STRING OPTIONAL
  39. * }
  40. *
  41. */
  42. static int check_replica_id_uniqueness(Replica *replica, RUV *supplier_ruv);
  43. static int
  44. encode_ruv(BerElement *ber, const RUV *ruv)
  45. {
  46. int rc = LDAP_SUCCESS;
  47. struct berval **bvals = NULL;
  48. PR_ASSERT(ber);
  49. PR_ASSERT(ruv);
  50. if (ruv_to_bervals(ruv, &bvals) != 0) {
  51. rc = LDAP_OPERATIONS_ERROR;
  52. goto done;
  53. }
  54. if (ber_printf(ber, "[V]", bvals) == -1) {
  55. rc = LDAP_ENCODING_ERROR;
  56. goto done;
  57. }
  58. rc = LDAP_SUCCESS;
  59. done:
  60. if (bvals)
  61. ber_bvecfree(bvals);
  62. return rc;
  63. }
  64. static void
  65. ruv_dump_to_log(const RUV *ruv, char *log_name)
  66. {
  67. if (!ruv) {
  68. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "%s: RUV: None\n", log_name);
  69. } else {
  70. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "%s: RUV:\n", log_name);
  71. ruv_dump(ruv, log_name, NULL);
  72. }
  73. }
  74. /* The data_guid and data parameters should only be set if we
  75. * are talking with a 9.0 replica. */
  76. static struct berval *
  77. create_ReplicationExtopPayload(const char *protocol_oid,
  78. const char *repl_root,
  79. char **extra_referrals,
  80. CSN *csn,
  81. int send_end,
  82. const char *data_guid,
  83. const struct berval *data)
  84. {
  85. struct berval *req_data = NULL;
  86. BerElement *tmp_bere = NULL;
  87. int rc = 0;
  88. Object *ruv_obj = NULL;
  89. Replica *repl;
  90. RUV *ruv;
  91. Slapi_DN *sdn = NULL;
  92. PR_ASSERT(protocol_oid != NULL || send_end);
  93. PR_ASSERT(repl_root != NULL);
  94. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  95. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "create_ReplicationExtopPayload - "
  96. "encoding '%s' payload...\n",
  97. send_end ? "End Replication" : "Start Replication");
  98. }
  99. /* Create the request data */
  100. if ((tmp_bere = der_alloc()) == NULL) {
  101. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  102. "encoding failed: der_alloc failed\n");
  103. goto loser;
  104. }
  105. if (!send_end) {
  106. if (ber_printf(tmp_bere, "{ss", protocol_oid, repl_root) == -1) {
  107. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  108. "encoding failed: ber_printf failed - protocol_oid (%s) repl_root (%s)\n",
  109. protocol_oid, repl_root);
  110. goto loser;
  111. }
  112. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  113. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "create_ReplicationExtopPayload - "
  114. "encoding protocol_oid: %s\n", protocol_oid);
  115. }
  116. } else {
  117. if (ber_printf(tmp_bere, "{s", repl_root) == -1) {
  118. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  119. "encoding failed: ber_printf failed - repl_root (%s)\n",
  120. repl_root);
  121. goto loser;
  122. }
  123. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  124. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "create_ReplicationExtopPayload - "
  125. "encoding repl_root: %s\n", repl_root);
  126. }
  127. }
  128. sdn = slapi_sdn_new_dn_byref(repl_root);
  129. repl = replica_get_replica_from_dn(sdn);
  130. if (repl == NULL) {
  131. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  132. "encoding failed: failed to get replica from dn (%s)\n",
  133. slapi_sdn_get_dn(sdn));
  134. goto loser;
  135. }
  136. ruv_obj = replica_get_ruv(repl);
  137. if (ruv_obj == NULL) {
  138. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  139. "encoding failed: failed to get ruv from replica suffix (%s)\n",
  140. slapi_sdn_get_dn(sdn));
  141. goto loser;
  142. }
  143. ruv = object_get_data(ruv_obj);
  144. PR_ASSERT(ruv);
  145. /* send supplier's ruv so that consumer can build its own referrals.
  146. In case of total protocol, it is also used as consumer's ruv once
  147. protocol successfully completes */
  148. /* We need to encode and send each time the local ruv in case we have changed it */
  149. rc = encode_ruv(tmp_bere, ruv);
  150. if (rc != 0) {
  151. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  152. "encoding failed: encode_ruv failed for replica suffix (%s)\n",
  153. slapi_sdn_get_dn(sdn));
  154. goto loser;
  155. }
  156. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  157. ruv_dump_to_log(ruv, "create_ReplicationExtopPayload");
  158. }
  159. if (!send_end) {
  160. char s[CSN_STRSIZE];
  161. ReplicaId rid;
  162. char *local_replica_referral[2] = {0};
  163. char **referrals_to_send = NULL;
  164. /* Add the referral URL(s), if present */
  165. rid = replica_get_rid(repl);
  166. if (!ruv_contains_replica(ruv, rid)) {
  167. /*
  168. * In the event that there is no RUV component for this replica (e.g.
  169. * if the database was just loaded from LDIF and no local CSNs have been
  170. * generated), then we need to explicitly add this server to the list
  171. * of referrals, since it wouldn't have been sent with the RUV.
  172. */
  173. local_replica_referral[0] = (char *)multisupplier_get_local_purl(); /* XXXggood had to cast away const */
  174. }
  175. charray_merge(&referrals_to_send, extra_referrals, 0);
  176. charray_merge(&referrals_to_send, local_replica_referral, 0);
  177. if (NULL != referrals_to_send) {
  178. if (ber_printf(tmp_bere, "[v]", referrals_to_send) == -1) {
  179. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  180. "encoding failed: ber_printf (referrals_to_send)\n");
  181. goto loser;
  182. }
  183. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  184. for (size_t i = 0; referrals_to_send[i]; i++) {
  185. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "create_ReplicationExtopPayload - "
  186. "encoding ref: %s\n", referrals_to_send[i]);
  187. }
  188. }
  189. slapi_ch_free((void **)&referrals_to_send);
  190. }
  191. /* Add the CSN */
  192. PR_ASSERT(NULL != csn);
  193. if (ber_printf(tmp_bere, "s", csn_as_string(csn, PR_FALSE, s)) == -1) {
  194. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  195. "encoding failed: ber_printf (csnstr)\n");
  196. goto loser;
  197. }
  198. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  199. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "create_ReplicationExtopPayload - "
  200. "encoding csn: %s\n", csn_as_string(csn, PR_FALSE, s));
  201. }
  202. }
  203. /* If we have data to send to a 9.0 style replica, set it here. */
  204. if (data_guid && data) {
  205. if (ber_printf(tmp_bere, "sO", data_guid, data) == -1) {
  206. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  207. "encoding failed: ber_printf (data_guid, data)\n");
  208. goto loser;
  209. }
  210. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  211. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "create_ReplicationExtopPayload - "
  212. "encoding data_guid (%s) data (%s:%ld)\n",
  213. data_guid, data->bv_val, data->bv_len);
  214. }
  215. }
  216. if (ber_printf(tmp_bere, "}") == -1) {
  217. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  218. "encoding failed: ber_printf (set's end)\n");
  219. goto loser;
  220. }
  221. if (ber_flatten(tmp_bere, &req_data) == -1) {
  222. slapi_log_err(SLAPI_LOG_ERR, "create_ReplicationExtopPayload",
  223. "encoding failed: ber_flatten failed\n");
  224. goto loser;
  225. }
  226. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  227. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "create_ReplicationExtopPayload - "
  228. "Encoding finished\n");
  229. }
  230. /* Success */
  231. goto done;
  232. loser:
  233. /* Free stuff we allocated */
  234. if (NULL != req_data) {
  235. ber_bvfree(req_data);
  236. req_data = NULL;
  237. }
  238. done:
  239. if (NULL != tmp_bere) {
  240. ber_free(tmp_bere, 1);
  241. tmp_bere = NULL;
  242. }
  243. if (NULL != sdn) {
  244. slapi_sdn_free(&sdn); /* Put on stack instead of allocating? */
  245. }
  246. if (NULL != ruv_obj) {
  247. object_release(ruv_obj);
  248. }
  249. return req_data;
  250. }
  251. struct berval *
  252. NSDS50StartReplicationRequest_new(const char *protocol_oid,
  253. const char *repl_root,
  254. char **extra_referrals,
  255. CSN *csn)
  256. {
  257. return (create_ReplicationExtopPayload(protocol_oid,
  258. repl_root, extra_referrals, csn, 0, 0, 0));
  259. }
  260. struct berval *
  261. NSDS90StartReplicationRequest_new(const char *protocol_oid,
  262. const char *repl_root,
  263. char **extra_referrals,
  264. CSN *csn,
  265. const char *data_guid,
  266. const struct berval *data)
  267. {
  268. return (create_ReplicationExtopPayload(protocol_oid,
  269. repl_root, extra_referrals, csn, 0, data_guid, data));
  270. }
  271. struct berval *
  272. NSDS50EndReplicationRequest_new(char *repl_root)
  273. {
  274. return (create_ReplicationExtopPayload(NULL, repl_root, NULL, NULL, 1, 0, 0));
  275. }
  276. static int
  277. decode_ruv(BerElement *ber, RUV **ruv)
  278. {
  279. int rc = -1;
  280. struct berval **bvals = NULL;
  281. PR_ASSERT(ber && ruv);
  282. if (ber_scanf(ber, "[V]", &bvals) == LBER_DEFAULT) {
  283. goto done;
  284. }
  285. if (ruv_init_from_bervals(bvals, ruv) != 0) {
  286. goto done;
  287. }
  288. rc = 0;
  289. done:
  290. if (bvals)
  291. ber_bvecfree(bvals);
  292. return rc;
  293. }
  294. /*
  295. * Decode an NSDS50 or NSDS90 Start Replication Request extended
  296. * operation. Returns 0 on success, -1 on decoding error.
  297. * The caller is responsible for freeing protocol_oid,
  298. * repl_root, referrals, csn, data_guid, and data.
  299. */
  300. static int
  301. decode_startrepl_extop(Slapi_PBlock *pb, char **protocol_oid, char **repl_root, RUV **supplier_ruv, char ***extra_referrals, char **csnstr, char **data_guid, struct berval **data, int *is90)
  302. {
  303. char *extop_oid = NULL;
  304. struct berval *extop_value = NULL;
  305. BerElement *tmp_bere = NULL;
  306. ber_len_t len;
  307. int rc = 0;
  308. PR_ASSERT(pb && protocol_oid && repl_root && supplier_ruv && extra_referrals && csnstr && data_guid && data);
  309. *protocol_oid = NULL;
  310. *repl_root = NULL;
  311. *supplier_ruv = NULL;
  312. *extra_referrals = NULL;
  313. *csnstr = NULL;
  314. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid);
  315. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_value);
  316. if ((NULL == extop_oid) ||
  317. ((strcmp(extop_oid, REPL_START_NSDS50_REPLICATION_REQUEST_OID) != 0) &&
  318. (strcmp(extop_oid, REPL_START_NSDS90_REPLICATION_REQUEST_OID) != 0)) ||
  319. !BV_HAS_DATA(extop_value))
  320. {
  321. /* bogus */
  322. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  323. "decoding failed: extop_oid (%s) (%s) extop_value (%s)\n",
  324. NULL == extop_oid ? "NULL" : "Ok",
  325. extop_oid ? extop_oid : "",
  326. extop_value ? !BV_HAS_DATA(extop_value) ? "No data" : "Ok" : "No data");
  327. rc = -1;
  328. goto free_and_return;
  329. }
  330. /* Set a flag to let the caller know if this is a 9.0 style start extop */
  331. if (strcmp(extop_oid, REPL_START_NSDS90_REPLICATION_REQUEST_OID) == 0) {
  332. *is90 = 1;
  333. } else {
  334. *is90 = 0;
  335. }
  336. if ((tmp_bere = ber_init(extop_value)) == NULL) {
  337. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  338. "decoding failed: ber_init for extop_value (%s:%lu)\n",
  339. extop_value->bv_val, extop_value->bv_len);
  340. rc = -1;
  341. goto free_and_return;
  342. }
  343. if (ber_scanf(tmp_bere, "{") == LBER_ERROR) {
  344. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  345. "decoding failed: ber_scanf 1\n");
  346. rc = -1;
  347. goto free_and_return;
  348. }
  349. /* Get the required protocol OID and root of replicated subtree */
  350. if (ber_get_stringa(tmp_bere, protocol_oid) == LBER_DEFAULT) {
  351. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  352. "decoding failed: ber_get_stringa (protocol_oid)\n");
  353. rc = -1;
  354. goto free_and_return;
  355. }
  356. if (ber_get_stringa(tmp_bere, repl_root) == LBER_DEFAULT) {
  357. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  358. "decoding failed: ber_get_stringa (repl_root)\n");
  359. rc = -1;
  360. goto free_and_return;
  361. }
  362. /* get supplier's ruv */
  363. if (decode_ruv(tmp_bere, supplier_ruv) == -1) {
  364. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  365. "decoding failed: decode_ruv (supplier_ruv)\n");
  366. rc = -1;
  367. goto free_and_return;
  368. }
  369. /* Get the optional set of referral URLs */
  370. if (ber_peek_tag(tmp_bere, &len) == LBER_SET) {
  371. if (ber_scanf(tmp_bere, "[v]", extra_referrals) == LBER_ERROR) {
  372. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  373. "decoding failed: ber_scanf (extra_referrals)\n");
  374. rc = -1;
  375. goto free_and_return;
  376. }
  377. }
  378. /* Get the CSN */
  379. if (ber_get_stringa(tmp_bere, csnstr) == LBER_ERROR) {
  380. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  381. "decoding failed: ber_get_stringa (csnstr)\n");
  382. rc = -1;
  383. goto free_and_return;
  384. }
  385. /* Get the optional replication session callback data. */
  386. if (ber_peek_tag(tmp_bere, &len) == LBER_OCTETSTRING) {
  387. if (ber_get_stringa(tmp_bere, data_guid) == LBER_ERROR) {
  388. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  389. "decoding failed: ber_get_stringa (data_guid)\n");
  390. rc = -1;
  391. goto free_and_return;
  392. }
  393. /* If a data_guid was specified, data must be specified as well. */
  394. if (ber_peek_tag(tmp_bere, &len) == LBER_OCTETSTRING) {
  395. if (ber_get_stringal(tmp_bere, data) == LBER_ERROR) {
  396. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  397. "decoding failed: ber_get_stringal (data)\n");
  398. rc = -1;
  399. goto free_and_return;
  400. }
  401. } else {
  402. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  403. "decoding failed: ber_peek_tag\n");
  404. rc = -1;
  405. goto free_and_return;
  406. }
  407. }
  408. if (ber_scanf(tmp_bere, "}") == LBER_ERROR) {
  409. slapi_log_err(SLAPI_LOG_ERR, "decode_startrepl_extop",
  410. "decoding failed: ber_scanf 2\n");
  411. rc = -1;
  412. goto free_and_return;
  413. }
  414. free_and_return:
  415. if (-1 == rc) {
  416. /* Free everything when error encountered */
  417. /* slapi_ch_free accepts NULL pointer */
  418. slapi_ch_free((void **)protocol_oid);
  419. slapi_ch_free((void **)repl_root);
  420. slapi_ch_array_free(*extra_referrals);
  421. *extra_referrals = NULL;
  422. slapi_ch_free((void **)csnstr);
  423. if (*supplier_ruv) {
  424. ruv_destroy(supplier_ruv);
  425. }
  426. } else if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  427. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  428. "decode_startrepl_extop - decoding payload...\n");
  429. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  430. "decode_startrepl_extop - decoded protocol_oid: %s\n", *protocol_oid);
  431. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  432. "decode_startrepl_extop - decoded repl_root: %s\n", *repl_root);
  433. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  434. "decode_startrepl_extop - decoded csn: %s\n", *csnstr);
  435. ruv_dump_to_log(*supplier_ruv, "decode_startrepl_extop");
  436. for (size_t i = 0; *extra_referrals && (*extra_referrals)[i]; i++) {
  437. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "decode_startrepl_extop - "
  438. "decoded referral: %s\n", (*extra_referrals)[i]);
  439. }
  440. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  441. "decode_startrepl_extop - Finshed decoding payload.\n");
  442. }
  443. if (NULL != tmp_bere) {
  444. ber_free(tmp_bere, 1);
  445. tmp_bere = NULL;
  446. }
  447. return rc;
  448. }
  449. /*
  450. * Decode an NSDS50 End Replication Request extended
  451. * operation. Returns 0 on success, -1 on decoding error.
  452. * The caller is responsible for freeing repl_root.
  453. */
  454. static int
  455. decode_endrepl_extop(Slapi_PBlock *pb, char **repl_root)
  456. {
  457. char *extop_oid = NULL;
  458. struct berval *extop_value = NULL;
  459. BerElement *tmp_bere = NULL;
  460. int rc = 0;
  461. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid);
  462. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_value);
  463. if ((NULL == extop_oid) ||
  464. (strcmp(extop_oid, REPL_END_NSDS50_REPLICATION_REQUEST_OID) != 0) ||
  465. !BV_HAS_DATA(extop_value))
  466. {
  467. /* bogus */
  468. slapi_log_err(SLAPI_LOG_ERR, "decode_endrepl_extop",
  469. "decoding failed: extop_oid (%s) correct oid (%s) extop_value data (%s)\n",
  470. extop_oid ? extop_oid : "NULL",
  471. extop_oid ? strcmp(extop_oid, REPL_END_NSDS50_REPLICATION_REQUEST_OID) != 0 ? "wrong oid" : "correct oid" : "NULL",
  472. !BV_HAS_DATA(extop_value) ? "No data" : "Has data");
  473. rc = -1;
  474. goto free_and_return;
  475. }
  476. if ((tmp_bere = ber_init(extop_value)) == NULL) {
  477. slapi_log_err(SLAPI_LOG_ERR, "decode_endrepl_extop",
  478. "decoding failed: ber_init failed: extop_value (%s:%lu)\n",
  479. extop_value->bv_val, extop_value->bv_len);
  480. rc = -1;
  481. goto free_and_return;
  482. }
  483. if (ber_scanf(tmp_bere, "{") == LBER_DEFAULT) {
  484. slapi_log_err(SLAPI_LOG_ERR, "decode_endrepl_extop",
  485. "decoding failed: ber_scanf failed1\n");
  486. rc = -1;
  487. goto free_and_return;
  488. }
  489. /* Get the required root of replicated subtree */
  490. if (ber_get_stringa(tmp_bere, repl_root) == LBER_DEFAULT) {
  491. slapi_log_err(SLAPI_LOG_ERR, "decode_endrepl_extop",
  492. "decoding failed: ber_get_stringa failed\n");
  493. rc = -1;
  494. goto free_and_return;
  495. }
  496. if (ber_scanf(tmp_bere, "}") == LBER_DEFAULT) {
  497. slapi_log_err(SLAPI_LOG_ERR, "decode_endrepl_extop",
  498. "decoding failed: ber_scanf2 failed\n");
  499. rc = -1;
  500. goto free_and_return;
  501. }
  502. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  503. slapi_log_err(SLAPI_LOG_REPL, "decode_endrepl_extop",
  504. "Decoding payload...\n");
  505. slapi_log_err(SLAPI_LOG_REPL, "decode_endrepl_extop",
  506. "Decoded repl_root: %s\n", *repl_root);
  507. slapi_log_err(SLAPI_LOG_REPL, "decode_endrepl_extop",
  508. "Finished decoding payload.\n");
  509. }
  510. free_and_return:
  511. if (NULL != tmp_bere) {
  512. ber_free(tmp_bere, 1);
  513. tmp_bere = NULL;
  514. }
  515. return rc;
  516. }
  517. /*
  518. * Decode an NSDS50ReplicationResponse or NSDS90ReplicationResponse
  519. * extended response. The extended response just contains a sequence
  520. * that contains:
  521. * 1) An integer response code
  522. * 2) An optional array of bervals representing the consumer
  523. * replica's update vector
  524. * 3) An optional data guid and data string if this is a 9.0
  525. * style response
  526. * Returns 0 on success, or -1 if the response could not be parsed.
  527. */
  528. int
  529. decode_repl_ext_response(struct berval *bvdata, int *response_code, struct berval ***ruv_bervals, char **data_guid, struct berval **data)
  530. {
  531. BerElement *tmp_bere = NULL;
  532. int return_value = 0;
  533. PR_ASSERT(NULL != response_code);
  534. PR_ASSERT(NULL != ruv_bervals);
  535. if ((NULL == response_code) || (NULL == ruv_bervals) ||
  536. (NULL == data_guid) || (NULL == data) || !BV_HAS_DATA(bvdata))
  537. {
  538. slapi_log_err(SLAPI_LOG_ERR, "decode_repl_ext_response",
  539. "decoding failed: response_code (%s) ruv_bervals (%s) data_guid (%s) data (%s) bvdata (%s)\n",
  540. NULL == response_code ? "NULL" : "Ok",
  541. NULL == ruv_bervals ? "NULL" : "Ok",
  542. NULL == data_guid ? "NULL" : "Ok",
  543. NULL == data ? "NULL" : "Ok",
  544. !BV_HAS_DATA(bvdata) ? "No data" : "Ok");
  545. return_value = -1;
  546. } else {
  547. ber_len_t len;
  548. ber_int_t temp_response_code = 0;
  549. *ruv_bervals = NULL;
  550. if ((tmp_bere = ber_init(bvdata)) == NULL) {
  551. slapi_log_err(SLAPI_LOG_ERR, "decode_repl_ext_response",
  552. "decoding failed: ber_init failed from bvdata (%s:%lu)\n",
  553. bvdata->bv_val, bvdata->bv_len);
  554. return_value = -1;
  555. } else if (ber_scanf(tmp_bere, "{e", &temp_response_code) == LBER_ERROR) {
  556. slapi_log_err(SLAPI_LOG_ERR, "decode_repl_ext_response",
  557. "decoding failed: ber_scanf failed\n");
  558. return_value = -1;
  559. } else if (ber_peek_tag(tmp_bere, &len) == LBER_SEQUENCE) {
  560. if (ber_scanf(tmp_bere, "{V}", ruv_bervals) == LBER_ERROR) {
  561. slapi_log_err(SLAPI_LOG_ERR, "decode_repl_ext_response",
  562. "decoding failed: ber_scanf2 failed from ruv_bervals\n");
  563. return_value = -1;
  564. }
  565. }
  566. /* Check for optional data from replication session callback */
  567. if (ber_peek_tag(tmp_bere, &len) == LBER_OCTETSTRING) {
  568. if (ber_scanf(tmp_bere, "aO}", data_guid, data) == LBER_ERROR) {
  569. slapi_log_err(SLAPI_LOG_ERR, "decode_repl_ext_response",
  570. "decoding failed: ber_scanf3 failed from data_guid & data\n");
  571. return_value = -1;
  572. }
  573. } else if (ber_scanf(tmp_bere, "}") == LBER_ERROR) {
  574. slapi_log_err(SLAPI_LOG_ERR, "decode_repl_ext_response",
  575. "decoding failed: ber_scanf4 failed\n");
  576. return_value = -1;
  577. }
  578. *response_code = (int)temp_response_code;
  579. }
  580. if (0 != return_value) {
  581. if (NULL != ruv_bervals && NULL != *ruv_bervals) {
  582. ber_bvecfree(*ruv_bervals);
  583. }
  584. }
  585. if (NULL != tmp_bere) {
  586. ber_free(tmp_bere, 1);
  587. tmp_bere = NULL;
  588. }
  589. return return_value;
  590. }
  591. /*
  592. * This plugin entry point is called whenever a
  593. * StartNSDS50ReplicationRequest is received.
  594. */
  595. int
  596. multisupplier_extop_StartNSDS50ReplicationRequest(Slapi_PBlock *pb)
  597. {
  598. int return_value = SLAPI_PLUGIN_EXTENDED_NOT_HANDLED;
  599. ber_int_t response = 0;
  600. int rc = 0;
  601. BerElement *resp_bere = NULL;
  602. struct berval *resp_bval = NULL;
  603. char *protocol_oid = NULL;
  604. char *repl_root = NULL;
  605. Slapi_DN *repl_root_sdn = NULL;
  606. char **referrals = NULL;
  607. Replica *replica = NULL;
  608. void *conn;
  609. consumer_connection_extension *connext = NULL;
  610. char *replicacsnstr = NULL;
  611. CSN *replicacsn = NULL;
  612. int zero = 0;
  613. int one = 1;
  614. RUV *ruv = NULL;
  615. struct berval **ruv_bervals = NULL;
  616. CSNGen *gen = NULL;
  617. Object *gen_obj = NULL;
  618. Slapi_DN *bind_sdn = NULL;
  619. char *bind_dn = NULL;
  620. Object *ruv_object = NULL;
  621. RUV *supplier_ruv = NULL;
  622. PRUint64 connid = 0;
  623. int opid = 0;
  624. PRBool isInc = PR_FALSE; /* true if incremental update */
  625. char *locking_purl = NULL; /* the supplier contacting us */
  626. char *current_purl = NULL; /* the supplier which already has exclusive access */
  627. char locking_session[42] = {0};
  628. char *data_guid = NULL;
  629. struct berval *data = NULL;
  630. int is90 = 0;
  631. /* Decode the extended operation */
  632. if (decode_startrepl_extop(pb, &protocol_oid, &repl_root, &supplier_ruv,
  633. &referrals, &replicacsnstr, &data_guid, &data, &is90) == -1) {
  634. response = NSDS50_REPL_DECODING_ERROR;
  635. goto send_response;
  636. }
  637. if (NULL == protocol_oid || NULL == repl_root || NULL == replicacsnstr) {
  638. response = NSDS50_REPL_DECODING_ERROR;
  639. goto send_response;
  640. }
  641. slapi_pblock_get(pb, SLAPI_CONN_ID, &connid);
  642. slapi_pblock_get(pb, SLAPI_OPERATION_ID, &opid);
  643. /*
  644. * Get a hold of the connection extension object and
  645. * make sure it's there.
  646. */
  647. slapi_pblock_get(pb, SLAPI_CONNECTION, &conn);
  648. connext = consumer_connection_extension_acquire_exclusive_access(conn, connid, opid);
  649. if (NULL == connext) {
  650. /* TEL 20120531: This used to be a much worse and unexpected thing
  651. * before acquiring exclusive access to the connext. Now it should
  652. * be highly unusual, but not completely unheard of. We don't want to
  653. * return an internal error here as before, because it will eventually
  654. * result in a fatal error on the other end. Better to tell it
  655. * we are busy instead--which is also probably true. */
  656. response = NSDS50_REPL_REPLICA_BUSY;
  657. goto send_response;
  658. }
  659. /* Verify that we know about this replication protocol OID */
  660. if (strcmp(protocol_oid, REPL_NSDS50_INCREMENTAL_PROTOCOL_OID) == 0) {
  661. if (repl_session_plugin_call_recv_acquire_cb(repl_root, 0 /* is_total == FALSE */,
  662. data_guid, data)) {
  663. slapi_ch_free_string(&data_guid);
  664. ber_bvfree(data);
  665. data = NULL;
  666. response = NSDS50_REPL_BACKOFF;
  667. goto send_response;
  668. } else {
  669. slapi_ch_free_string(&data_guid);
  670. ber_bvfree(data);
  671. data = NULL;
  672. }
  673. /* Stash info that this is an incremental update session */
  674. connext->repl_protocol_version = REPL_PROTOCOL_50_INCREMENTAL;
  675. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  676. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  677. "conn=%" PRIu64 " op=%d repl=\"%s\": Begin incremental protocol\n",
  678. connid, opid, repl_root);
  679. isInc = PR_TRUE;
  680. } else if (strcmp(protocol_oid, REPL_NSDS50_TOTAL_PROTOCOL_OID) == 0) {
  681. if (repl_session_plugin_call_recv_acquire_cb(repl_root, 1 /* is_total == TRUE */,
  682. data_guid, data)) {
  683. slapi_ch_free_string(&data_guid);
  684. ber_bvfree(data);
  685. data = NULL;
  686. response = NSDS50_REPL_DISABLED;
  687. goto send_response;
  688. } else {
  689. slapi_ch_free_string(&data_guid);
  690. ber_bvfree(data);
  691. data = NULL;
  692. }
  693. /* Stash info that this is a total update session */
  694. if (NULL != connext) {
  695. connext->repl_protocol_version = REPL_PROTOCOL_50_TOTALUPDATE;
  696. }
  697. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  698. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  699. "conn=%" PRIu64 " op=%d repl=\"%s\": Begin total protocol\n",
  700. connid, opid, repl_root);
  701. isInc = PR_FALSE;
  702. } else if (strcmp(protocol_oid, REPL_NSDS71_INCREMENTAL_PROTOCOL_OID) == 0) {
  703. /* Stash info that this is an incremental update session */
  704. connext->repl_protocol_version = REPL_PROTOCOL_50_INCREMENTAL;
  705. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  706. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  707. "conn=%" PRIu64 " op=%d repl=\"%s\": Begin 7.1 incremental protocol\n",
  708. connid, opid, repl_root);
  709. isInc = PR_TRUE;
  710. } else if (strcmp(protocol_oid, REPL_NSDS71_TOTAL_PROTOCOL_OID) == 0) {
  711. /* Stash info that this is a total update session */
  712. if (NULL != connext) {
  713. connext->repl_protocol_version = REPL_PROTOCOL_50_TOTALUPDATE;
  714. }
  715. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  716. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  717. "conn=%" PRIu64 " op=%d repl=\"%s\": Begin 7.1 total protocol\n",
  718. connid, opid, repl_root);
  719. isInc = PR_FALSE;
  720. } else {
  721. /* Unknown replication protocol */
  722. response = NSDS50_REPL_UNKNOWN_UPDATE_PROTOCOL;
  723. goto send_response;
  724. }
  725. /* Verify that repl_root names a valid replicated area */
  726. if ((repl_root_sdn = slapi_sdn_new_dn_byval(repl_root)) == NULL) {
  727. response = NSDS50_REPL_INTERNAL_ERROR;
  728. goto send_response;
  729. }
  730. /* see if this replica is being configured and wait for it */
  731. if (replica_is_being_configured(repl_root)) {
  732. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  733. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  734. "conn=%" PRIu64 " op=%d replica=\"%s\": "
  735. "Replica is being configured: try again later\n",
  736. connid, opid, repl_root);
  737. response = NSDS50_REPL_REPLICA_BUSY;
  738. goto send_response;
  739. }
  740. replica = replica_get_replica_from_dn(repl_root_sdn);
  741. if (NULL == replica) {
  742. response = NSDS50_REPL_NO_SUCH_REPLICA;
  743. goto send_response;
  744. }
  745. if (REPL_PROTOCOL_50_TOTALUPDATE == connext->repl_protocol_version) {
  746. /* If total update has been initiated against other replicas or
  747. * this replica is already being initialized, we should return
  748. * an error immediately. */
  749. if (replica_is_state_flag_set(replica,
  750. REPLICA_TOTAL_EXCL_SEND | REPLICA_TOTAL_EXCL_RECV)) {
  751. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  752. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  753. "%s: total update on is initiated on the replica. Cannot execute the total update from other supplier.\n",
  754. repl_root);
  755. response = NSDS50_REPL_REPLICA_BUSY;
  756. goto send_response;
  757. } else {
  758. replica_set_state_flag(replica, REPLICA_TOTAL_EXCL_RECV, 0);
  759. }
  760. }
  761. /* Check that bind dn is authorized to supply replication updates */
  762. slapi_pblock_get(pb, SLAPI_CONN_DN, &bind_dn); /* bind_dn is allocated */
  763. bind_sdn = slapi_sdn_new_dn_passin(bind_dn);
  764. if (replica_is_updatedn(replica, bind_sdn) == PR_FALSE) {
  765. response = NSDS50_REPL_PERMISSION_DENIED;
  766. goto send_response;
  767. }
  768. /* Check received CSN for clock skew */
  769. gen_obj = replica_get_csngen(replica);
  770. if (NULL != gen_obj) {
  771. gen = object_get_data(gen_obj);
  772. if (NULL != gen) {
  773. replicacsn = csn_new_by_string(replicacsnstr);
  774. if (NULL != replicacsn) {
  775. /* ONREPL - we used to manage clock skew here. However, csn generator
  776. code already does it. The csngen also manages local skew caused by
  777. system clock reset, so to keep it consistent, I removed code from here */
  778. /* update the state of the csn generator */
  779. rc = replica_update_csngen_state_ext(replica, supplier_ruv, replicacsn); /* too much skew */
  780. if (rc == CSN_LIMIT_EXCEEDED) {
  781. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  782. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  783. "conn=%" PRIu64 " op=%d repl=\"%s\": "
  784. "Excessive clock skew from supplier RUV\n",
  785. connid, opid, repl_root);
  786. response = NSDS50_REPL_EXCESSIVE_CLOCK_SKEW;
  787. goto send_response;
  788. } else if (rc != 0) {
  789. /* Oops, problem csn or ruv format, or memory, or .... */
  790. response = NSDS50_REPL_INTERNAL_ERROR;
  791. goto send_response;
  792. }
  793. } else {
  794. /* Oops, csnstr couldn't be converted */
  795. response = NSDS50_REPL_INTERNAL_ERROR;
  796. goto send_response;
  797. }
  798. } else {
  799. /* Oops, no csn generator */
  800. response = NSDS50_REPL_INTERNAL_ERROR;
  801. goto send_response;
  802. }
  803. } else {
  804. /* Oops, no csn generator object */
  805. response = NSDS50_REPL_INTERNAL_ERROR;
  806. goto send_response;
  807. }
  808. if (check_replica_id_uniqueness(replica, supplier_ruv) != 0) {
  809. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  810. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  811. "conn=%" PRIu64 " op=%d repl=\"%s\": "
  812. "Replica has same replicaID %d as supplier\n",
  813. connid, opid, repl_root, replica_get_rid(replica));
  814. response = NSDS50_REPL_REPLICAID_ERROR;
  815. goto send_response;
  816. }
  817. /* Attempt to acquire exclusive access to the replicated area */
  818. /* Since partial URL is always the supplier, this locking_purl does not
  819. * help us to know the true locker when it is a hub. Change to use
  820. * the session's conn id and op id to identify the the supplier.
  821. */
  822. /* junkrc = ruv_get_first_id_and_purl(supplier_ruv, &junkrid, &locking_purl); */
  823. snprintf(locking_session, sizeof(locking_session), "conn=%" PRIu64 " id=%d",
  824. connid, opid);
  825. locking_purl = &locking_session[0];
  826. if (replica_get_exclusive_access(replica, &isInc, connid, opid,
  827. locking_purl,
  828. &current_purl) == PR_FALSE) {
  829. locking_purl = NULL; /* no dangling pointers */
  830. response = NSDS50_REPL_REPLICA_BUSY;
  831. goto send_response;
  832. } else {
  833. locking_purl = NULL; /* no dangling pointers */
  834. /* Stick the replica object pointer in the connection extension */
  835. connext->replica_acquired = replica;
  836. }
  837. /* remove this code once ticket 374 is fixed */
  838. #ifdef ENABLE_TEST_TICKET_374
  839. #include <unistd.h>
  840. if (getenv("SLAPD_TEST_TICKET_374") && (opid > 20)) {
  841. int i = 0;
  842. int max = 480 * 5;
  843. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  844. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  845. "conn=%" PRIu64 " op=%d repl=\"%s\": "
  846. "374 - Starting sleep: connext->repl_protocol_version == %d\n",
  847. connid, opid, repl_root, connext->repl_protocol_version);
  848. while (REPL_PROTOCOL_50_INCREMENTAL == connext->repl_protocol_version && i++ < max) {
  849. usleep(200000);
  850. }
  851. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  852. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  853. "conn=%" PRIu64 " op=%d repl=\"%s\": "
  854. "374 - Finished sleep: connext->repl_protocol_version == %d\n",
  855. connid, opid, repl_root, connext->repl_protocol_version);
  856. }
  857. #endif
  858. /* If this is incremental protocol get replica's ruv to return to the supplier */
  859. if (connext->repl_protocol_version == REPL_PROTOCOL_50_INCREMENTAL) {
  860. ruv_object = replica_get_ruv(replica);
  861. if (NULL != ruv_object) {
  862. ruv = object_get_data(ruv_object);
  863. (void)ruv_to_bervals(ruv, &ruv_bervals);
  864. object_release(ruv_object);
  865. }
  866. }
  867. /*
  868. * Save the supplier ruv in the connection extension so it can
  869. * either (a) be installed upon successful initialization (if this
  870. * is a total update session) or used to update referral information
  871. * for new replicas that show up in the supplier's RUV.
  872. */
  873. /*
  874. * the supplier_ruv may have been set before, so free it here
  875. * (in ruv_copy_and_destroy)
  876. */
  877. ruv_copy_and_destroy(&supplier_ruv, (RUV **)&connext->supplier_ruv);
  878. /* incremental update protocol */
  879. if (connext->repl_protocol_version == REPL_PROTOCOL_50_INCREMENTAL) {
  880. /* The supplier ruv may have changed, so let's update the referrals */
  881. consumer5_set_mapping_tree_state_for_replica(replica, connext->supplier_ruv);
  882. }
  883. /* total update protocol */
  884. else if (connext->repl_protocol_version == REPL_PROTOCOL_50_TOTALUPDATE) {
  885. char *mtnstate = slapi_mtn_get_state(repl_root_sdn);
  886. char **mtnreferral = slapi_mtn_get_referral(repl_root_sdn);
  887. /* richm 20041118 - we do not want to reap tombstones while there is
  888. a total update in progress, so shut it down */
  889. replica_set_tombstone_reap_stop(replica, PR_TRUE);
  890. /* richm 20010831 - set the mapping tree to the referral state *before*
  891. we invoke slapi_start_bulk_import - see bug 556992 -
  892. slapi_start_bulk_import sets the database offline, if an operation comes
  893. in while the database is offline but the mapping tree is not referring yet,
  894. the server gets confused
  895. */
  896. /* During a total update we refer *all* operations */
  897. repl_set_mtn_state_and_referrals(repl_root_sdn, STATE_REFERRAL,
  898. connext->supplier_ruv, NULL, referrals);
  899. /* LPREPL - check the return code.
  900. * But what do we do if mapping tree could not be updated ? */
  901. /* start the bulk import */
  902. slapi_pblock_set(pb, SLAPI_TARGET_SDN, repl_root_sdn);
  903. rc = slapi_start_bulk_import(pb);
  904. if (rc != LDAP_SUCCESS) {
  905. response = NSDS50_REPL_INTERNAL_ERROR;
  906. /* reset the mapping tree state to what it was before
  907. we tried to do the bulk import if mtnstate exists */
  908. if (mtnstate) {
  909. repl_set_mtn_state_and_referrals(repl_root_sdn, mtnstate,
  910. NULL, NULL, mtnreferral);
  911. slapi_ch_free_string(&mtnstate);
  912. }
  913. charray_free(mtnreferral);
  914. mtnreferral = NULL;
  915. goto send_response;
  916. }
  917. slapi_ch_free_string(&mtnstate);
  918. charray_free(mtnreferral);
  919. mtnreferral = NULL;
  920. }
  921. /* something unexpected at this point, like REPL_PROTOCOL_UNKNOWN */
  922. else {
  923. /* TEL 20120529: This condition isn't supposed to happen, but it
  924. * has been observed in the past when the consumer is under such
  925. * stress that the supplier sends additional start extops before
  926. * the consumer has finished processing an earlier one. Fixing
  927. * the underlying race should prevent this from happening in the
  928. * future at all, but just in case it is still worth testing the
  929. * requested protocol explictly and returning an error here rather
  930. * than assuming a total update was requested.
  931. * https://fedorahosted.org/389/ticket/374 */
  932. response = NSDS50_REPL_INTERNAL_ERROR;
  933. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  934. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  935. "conn=%" PRIu64 " op=%d repl=\"%s\": "
  936. "Unexpected update protocol received: %d. "
  937. "Expected incremental or total.\n",
  938. connid, opid, repl_root, connext->repl_protocol_version);
  939. goto send_response;
  940. }
  941. response = NSDS50_REPL_REPLICA_READY;
  942. /* Set the "is replication session" flag in the connection extension */
  943. slapi_pblock_set(pb, SLAPI_CONN_IS_REPLICATION_SESSION, &one);
  944. connext->isreplicationsession = 1;
  945. /* Save away the connection */
  946. slapi_pblock_get(pb, SLAPI_CONNECTION, &connext->connection);
  947. send_response:
  948. if (connext && replica &&
  949. (REPL_PROTOCOL_50_TOTALUPDATE == connext->repl_protocol_version)) {
  950. replica_set_state_flag(replica, REPLICA_TOTAL_EXCL_RECV, 1);
  951. }
  952. if (response != NSDS50_REPL_REPLICA_READY) {
  953. int resp_log_level = SLAPI_LOG_ERR;
  954. char purlstr[1024] = {0};
  955. if (current_purl)
  956. PR_snprintf(purlstr, sizeof(purlstr), " locked by %s for %s update", current_purl,
  957. isInc ? "incremental" : "total");
  958. /* Don't log replica busy as errors - these are almost always not
  959. errors - use the replication monitoring tools to determine if
  960. a replica is not converging, then look for pathological replica
  961. busy errors by turning on the replication log level. We also
  962. don't want to log replica backoff as an error, as that response
  963. is only used when a replication session hook wants a supplier to
  964. go into incremental backoff mode. */
  965. if ((response == NSDS50_REPL_REPLICA_BUSY) || (response == NSDS50_REPL_BACKOFF)) {
  966. resp_log_level = SLAPI_LOG_REPL;
  967. }
  968. slapi_log_err(resp_log_level,
  969. repl_plugin_name,
  970. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  971. "conn=%" PRIu64 " op=%d replica=\"%s\": "
  972. "Unable to acquire replica: error: %s%s\n",
  973. connid, opid,
  974. (replica ? slapi_sdn_get_dn(replica_get_root(replica)) : "unknown"),
  975. protocol_response2string(response), purlstr);
  976. /* enable tombstone reap again since the total update failed */
  977. replica_set_tombstone_reap_stop(replica, PR_FALSE);
  978. }
  979. /* Call any registered replica session reply callback. We
  980. * want to reject the updates if the return value is non-0. */
  981. if (repl_session_plugin_call_reply_acquire_cb(replica ? slapi_sdn_get_ndn(replica_get_root(replica)) : "",
  982. ((isInc == PR_TRUE) ? 0 : 1), &data_guid, &data)) {
  983. slapi_ch_free_string(&data_guid);
  984. ber_bvfree(data);
  985. data = NULL;
  986. response = NSDS50_REPL_BACKOFF;
  987. }
  988. /* Send the response */
  989. if ((resp_bere = der_alloc()) == NULL) {
  990. /* ONREPL - not sure what we suppose to do here */
  991. }
  992. ber_printf(resp_bere, "{e", response);
  993. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  994. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  995. "multisupplier_extop_StartNSDS50ReplicationRequest - encoded response: %d\n",
  996. response);
  997. }
  998. if (NULL != ruv_bervals) {
  999. ber_printf(resp_bere, "{V}", ruv_bervals);
  1000. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  1001. ruv_dump_to_log(ruv, "multisupplier_extop_StartNSDS50ReplicationRequest");
  1002. }
  1003. }
  1004. /* Add extra data from replication session callback if necessary */
  1005. if (is90 && data_guid && data) {
  1006. ber_printf(resp_bere, "sO", data_guid, data);
  1007. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  1008. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  1009. "multisupplier_extop_StartNSDS50ReplicationRequest - encoded data_guid (%s) data (%s:%ld)\n",
  1010. data_guid, data->bv_val, data->bv_len);
  1011. }
  1012. }
  1013. ber_printf(resp_bere, "}");
  1014. ber_flatten(resp_bere, &resp_bval);
  1015. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  1016. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  1017. "multisupplier_extop_StartNSDS50ReplicationRequest - Finished encoding payload\n");
  1018. }
  1019. if (is90) {
  1020. slapi_pblock_set(pb, SLAPI_EXT_OP_RET_OID, REPL_NSDS90_REPLICATION_RESPONSE_OID);
  1021. } else {
  1022. slapi_pblock_set(pb, SLAPI_EXT_OP_RET_OID, REPL_NSDS50_REPLICATION_RESPONSE_OID);
  1023. }
  1024. /* connext (release our hold on it at least) */
  1025. if (NULL != connext) {
  1026. /* don't free it, just let go of it */
  1027. consumer_connection_extension_relinquish_exclusive_access(conn, connid, opid, PR_FALSE);
  1028. }
  1029. slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval);
  1030. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  1031. "multisupplier_extop_StartNSDS50ReplicationRequest - "
  1032. "conn=%" PRIu64 " op=%d repl=\"%s\": "
  1033. "%s: response=%d rc=%d\n",
  1034. connid, opid, repl_root,
  1035. is90 ? "StartNSDS90ReplicationRequest" : "StartNSDS50ReplicationRequest", response, rc);
  1036. slapi_send_ldap_result(pb, LDAP_SUCCESS, NULL, NULL, 0, NULL);
  1037. return_value = SLAPI_PLUGIN_EXTENDED_SENT_RESULT;
  1038. /* Free any data allocated by the replication
  1039. * session reply callback. */
  1040. slapi_ch_free_string(&data_guid);
  1041. ber_bvfree(data);
  1042. data = NULL;
  1043. slapi_ch_free_string(&current_purl);
  1044. /* protocol_oid */
  1045. /* slapi_ch_free accepts NULL pointer */
  1046. slapi_ch_free((void **)&protocol_oid);
  1047. /* repl_root */
  1048. slapi_ch_free((void **)&repl_root);
  1049. /* supplier's ruv */
  1050. if (supplier_ruv) {
  1051. ruv_destroy(&supplier_ruv);
  1052. }
  1053. /* referrals (char **) */
  1054. slapi_ch_array_free(referrals);
  1055. /* replicacsnstr */
  1056. slapi_ch_free((void **)&replicacsnstr);
  1057. /* repl_root_sdn */
  1058. slapi_sdn_free(&repl_root_sdn);
  1059. if (NSDS50_REPL_REPLICA_READY != response) {
  1060. /*
  1061. * Something went wrong, and we never told the other end that the
  1062. * replica had been acquired, so we'd better release it.
  1063. */
  1064. if (NULL != connext && NULL != connext->replica_acquired) {
  1065. Replica *r = connext->replica_acquired;
  1066. uint64_t r_locking_conn;
  1067. /* At this point the supplier runs a Replica Agreement for
  1068. * the specific replica connext->replica_acquired.
  1069. * The RA does not know it holds the replica (because it is
  1070. * sending this request).
  1071. * The situation is confused
  1072. */
  1073. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "multisupplier_extop_StartNSDS50ReplicationRequest - "
  1074. "already acquired replica: replica not ready (%d) (replica=%s)\n",
  1075. response, replica_get_name(r) ? replica_get_name(r) : "no name");
  1076. /*
  1077. * On consumer side, we release the exclusive access at the
  1078. * condition this is this RA that holds the replica
  1079. */
  1080. if (r) {
  1081. r_locking_conn = replica_get_locking_conn(r);
  1082. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "multisupplier_extop_StartNSDS50ReplicationRequest - "
  1083. "already acquired replica: locking_conn=%" PRIu64 ", current connid=%" PRIu64 "\n",
  1084. r_locking_conn, connid);
  1085. if ((r_locking_conn != ULONG_MAX) && (r_locking_conn == connid)) {
  1086. replica_relinquish_exclusive_access(r, connid, opid);
  1087. connext->replica_acquired = NULL;
  1088. }
  1089. }
  1090. /*
  1091. * On consumer side we should not keep a incoming connection
  1092. * with replica_acquired set although the supplier is not aware of
  1093. *
  1094. * On the supplier, we need to close the connection so
  1095. * that the RA will restart a new session in a clear state
  1096. */
  1097. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "multisupplier_extop_StartNSDS50ReplicationRequest - "
  1098. "already acquired replica: disconnect conn=%" PRIu64 "\n",
  1099. connid);
  1100. slapi_disconnect_server(conn);
  1101. }
  1102. /* Remove any flags that would indicate repl session in progress */
  1103. if (NULL != connext) {
  1104. connext->repl_protocol_version = REPL_PROTOCOL_UNKNOWN;
  1105. connext->isreplicationsession = 0;
  1106. }
  1107. slapi_pblock_set(pb, SLAPI_CONN_IS_REPLICATION_SESSION, &zero);
  1108. }
  1109. /* bind_sdn */
  1110. if (NULL != bind_sdn) {
  1111. slapi_sdn_free(&bind_sdn);
  1112. }
  1113. /* Release reference to gen_obj */
  1114. if (NULL != gen_obj) {
  1115. object_release(gen_obj);
  1116. }
  1117. /* replicacsn */
  1118. if (NULL != replicacsn) {
  1119. csn_free(&replicacsn);
  1120. }
  1121. /* resp_bere */
  1122. if (NULL != resp_bere) {
  1123. ber_free(resp_bere, 1);
  1124. }
  1125. /* resp_bval */
  1126. if (NULL != resp_bval) {
  1127. ber_bvfree(resp_bval);
  1128. }
  1129. /* ruv_bervals */
  1130. if (NULL != ruv_bervals) {
  1131. ber_bvecfree(ruv_bervals);
  1132. }
  1133. return return_value;
  1134. }
  1135. /*
  1136. * This plugin entry point is called whenever an
  1137. * EndNSDS50ReplicationRequest is received.
  1138. * XXXggood this code is not finished.
  1139. */
  1140. int
  1141. multisupplier_extop_EndNSDS50ReplicationRequest(Slapi_PBlock *pb)
  1142. {
  1143. int return_value = SLAPI_PLUGIN_EXTENDED_NOT_HANDLED;
  1144. char *repl_root = NULL;
  1145. Slapi_DN *repl_root_sdn = NULL;
  1146. BerElement *resp_bere = NULL;
  1147. struct berval *resp_bval = NULL;
  1148. ber_int_t response;
  1149. void *conn;
  1150. consumer_connection_extension *connext = NULL;
  1151. PRUint64 connid = 0;
  1152. int opid = -1;
  1153. /* Decode the extended operation */
  1154. if (decode_endrepl_extop(pb, &repl_root) == -1) {
  1155. response = NSDS50_REPL_DECODING_ERROR;
  1156. } else {
  1157. /* First, verify that the current connection is a replication session */
  1158. /* XXXggood - do we need to wait around for any pending updates to complete?
  1159. I suppose it's possible that the end request may arrive asynchronously, before
  1160. we're really done processing all the updates.
  1161. */
  1162. /* Get a hold of the connection extension object */
  1163. slapi_pblock_get(pb, SLAPI_CONNECTION, &conn);
  1164. slapi_pblock_get(pb, SLAPI_OPERATION_ID, &opid);
  1165. if (opid)
  1166. slapi_pblock_get(pb, SLAPI_CONN_ID, &connid);
  1167. /* TEL 20120531: unlike the replica, exclusive access to the connext should
  1168. * have been dropped at the end of the 'start' op. the only reason we couldn't
  1169. * get access to it would be if some other start or end op currently has it.
  1170. * if that is the case, the result of our getting it would be unpredictable anyway.
  1171. */
  1172. connext = consumer_connection_extension_acquire_exclusive_access(conn, connid, opid);
  1173. if (NULL != connext && NULL != connext->replica_acquired) {
  1174. int zero = 0;
  1175. Replica *r = connext->replica_acquired;
  1176. /* if this is total protocol we need to install suppliers ruv for the replica */
  1177. if (connext->repl_protocol_version == REPL_PROTOCOL_50_TOTALUPDATE) {
  1178. /* We no longer need to refer all operations...
  1179. * and update the referrals on the mapping tree node
  1180. */
  1181. consumer5_set_mapping_tree_state_for_replica(r, NULL);
  1182. /* LPREPL - First we clear the total in progress flag
  1183. Like this we know it's a normal termination of import. This is required by
  1184. the replication function that responds to backend state change.
  1185. If the flag is not clear, the callback knows that replication should not be
  1186. enabled again */
  1187. replica_set_state_flag(r, REPLICA_TOTAL_IN_PROGRESS, PR_TRUE /* clear flag */);
  1188. /* slapi_pblock_set (pb, SLAPI_TARGET_DN, repl_root); */
  1189. /* Verify that repl_root names a valid replicated area */
  1190. if ((repl_root_sdn = slapi_sdn_new_dn_byref(repl_root)) == NULL) {
  1191. response = NSDS50_REPL_INTERNAL_ERROR;
  1192. goto send_response;
  1193. }
  1194. slapi_pblock_set(pb, SLAPI_TARGET_SDN, repl_root_sdn);
  1195. slapi_stop_bulk_import(pb);
  1196. /* ONREPL - this is a bit of a hack. Once bulk import is finished,
  1197. the replication function that responds to backend state change
  1198. will be called. That function normally do all ruv and changelog
  1199. processing. However, in the case of replica initalization, it
  1200. will not do the right thing because supplier does not send its
  1201. ruv tombstone to the consumer. So that's why we need to do the
  1202. second processing here.
  1203. The supplier does not send its RUV entry because it could be
  1204. more up to date then the data send to the consumer.
  1205. The best solution I think, would be to "fake" on the supplier
  1206. an entry that corresponds to the ruv sent to the consumer and then
  1207. send it as part of the data */
  1208. /*
  1209. if (cldb_is_open(r)) {
  1210. cl5DeleteDBSync(connext->replica_acquired);
  1211. }
  1212. no longer needed, the cl was recreated when replication was reenabled
  1213. at the end of bulk import
  1214. */
  1215. replica_set_ruv(r, connext->supplier_ruv);
  1216. connext->supplier_ruv = NULL;
  1217. /* if changelog is enabled, we need to log a dummy change for the
  1218. smallest csn in the new ruv, so that this replica ca supply
  1219. other servers.
  1220. */
  1221. if (replica_is_flag_set(r, REPLICA_LOG_CHANGES) && cldb_is_open(r)) {
  1222. replica_log_ruv_elements(r);
  1223. /* now that the changelog is open and started, we can alos cretae the
  1224. * keep alive entry without risk that db and cl will not match
  1225. */
  1226. replica_subentry_check(slapi_sdn_get_dn(replica_get_root(r)), replica_get_rid(r));
  1227. }
  1228. /* ONREPL code that dealt with new RUV, etc was moved into the code
  1229. that enables replication when a backend comes back online. This
  1230. code is called once the bulk import is finished */
  1231. /* allow reaping again */
  1232. replica_set_tombstone_reap_stop(r, PR_FALSE);
  1233. } else if (connext->repl_protocol_version == REPL_PROTOCOL_50_INCREMENTAL) {
  1234. /* The ruv from the supplier may have changed. Report the change on the
  1235. consumer side */
  1236. replica_update_ruv_consumer(r, connext->supplier_ruv);
  1237. }
  1238. /* Relinquish control of the replica */
  1239. replica_relinquish_exclusive_access(r, connid, opid);
  1240. connext->replica_acquired = NULL;
  1241. connext->isreplicationsession = 0;
  1242. slapi_pblock_set(pb, SLAPI_CONN_IS_REPLICATION_SESSION, &zero);
  1243. response = NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED;
  1244. /* Outbound replication agreements need to all be restarted now */
  1245. /* XXXGGOOD RESTART REEPL AGREEMENTS */
  1246. } else {
  1247. /* Unless bail out, we return uninitialized response */
  1248. goto free_and_return;
  1249. }
  1250. }
  1251. send_response:
  1252. /* connext (release our hold on it at least) */
  1253. if (NULL != connext) {
  1254. /* don't free it, just let go of it */
  1255. consumer_connection_extension_relinquish_exclusive_access(conn, connid, opid, PR_FALSE);
  1256. connext = NULL;
  1257. }
  1258. /* Send the response code */
  1259. if ((resp_bere = der_alloc()) == NULL) {
  1260. goto free_and_return;
  1261. }
  1262. if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
  1263. slapi_log_err(SLAPI_LOG_REPL, "multisupplier_extop_EndNSDS50ReplicationRequest",
  1264. "encoded response: %d\n", response);
  1265. }
  1266. ber_printf(resp_bere, "{e}", response);
  1267. ber_flatten(resp_bere, &resp_bval);
  1268. slapi_pblock_set(pb, SLAPI_EXT_OP_RET_OID, REPL_NSDS50_REPLICATION_RESPONSE_OID);
  1269. slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval);
  1270. slapi_send_ldap_result(pb, LDAP_SUCCESS, NULL, NULL, 0, NULL);
  1271. return_value = SLAPI_PLUGIN_EXTENDED_SENT_RESULT;
  1272. free_and_return:
  1273. /* repl_root */
  1274. slapi_ch_free((void **)&repl_root);
  1275. slapi_sdn_free(&repl_root_sdn);
  1276. /* BerElement */
  1277. if (NULL != resp_bere) {
  1278. ber_free(resp_bere, 1);
  1279. }
  1280. /* response */
  1281. if (NULL != resp_bval) {
  1282. ber_bvfree(resp_bval);
  1283. }
  1284. /* connext (release our hold on it if not already released) */
  1285. if (NULL != connext) {
  1286. /* don't free it, just let go of it */
  1287. consumer_connection_extension_relinquish_exclusive_access(conn, connid, opid, PR_FALSE);
  1288. }
  1289. return return_value;
  1290. }
  1291. /*
  1292. * Decode the ber element passed to us by the cleanAllRUV task
  1293. */
  1294. int
  1295. decode_cleanruv_payload(struct berval *extop_value, char **payload)
  1296. {
  1297. BerElement *tmp_bere = NULL;
  1298. int rc = 0;
  1299. if (!BV_HAS_DATA(extop_value)) {
  1300. rc = -1;
  1301. goto free_and_return;
  1302. }
  1303. if ((tmp_bere = ber_init(extop_value)) == NULL) {
  1304. rc = -1;
  1305. goto free_and_return;
  1306. }
  1307. if (ber_scanf(tmp_bere, "{") == LBER_ERROR) {
  1308. rc = -1;
  1309. goto free_and_return;
  1310. }
  1311. if (ber_get_stringa(tmp_bere, payload) == LBER_DEFAULT) {
  1312. rc = -1;
  1313. goto free_and_return;
  1314. }
  1315. if (ber_scanf(tmp_bere, "}") == LBER_ERROR) {
  1316. rc = -1;
  1317. goto free_and_return;
  1318. }
  1319. free_and_return:
  1320. if (-1 == rc) {
  1321. slapi_ch_free_string(payload);
  1322. }
  1323. if (NULL != tmp_bere) {
  1324. ber_free(tmp_bere, 1);
  1325. tmp_bere = NULL;
  1326. }
  1327. return rc;
  1328. }
  1329. int
  1330. multisupplier_extop_abort_cleanruv(Slapi_PBlock *pb)
  1331. {
  1332. PRThread *thread = NULL;
  1333. cleanruv_data *data;
  1334. Replica *r;
  1335. ReplicaId rid;
  1336. struct berval *extop_payload = NULL;
  1337. char *extop_oid;
  1338. char *repl_root;
  1339. char *payload = NULL;
  1340. char *certify_all;
  1341. char *iter = NULL;
  1342. int rc = LDAP_SUCCESS;
  1343. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid);
  1344. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_payload);
  1345. if (NULL == extop_oid || strcmp(extop_oid, REPL_ABORT_CLEANRUV_OID) != 0 ||
  1346. NULL == extop_payload || NULL == extop_payload->bv_val) {
  1347. /* something is wrong, error out */
  1348. return LDAP_OPERATIONS_ERROR;
  1349. }
  1350. /*
  1351. * Decode the payload, and grab our settings
  1352. */
  1353. if (decode_cleanruv_payload(extop_payload, &payload)) {
  1354. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multisupplier_extop_abort_cleanruv - "
  1355. "Abort CleanAllRUV Task - Failed to decode payload. Aborting ext op\n");
  1356. return LDAP_OPERATIONS_ERROR;
  1357. }
  1358. rid = atoi(ldap_utf8strtok_r(payload, ":", &iter));
  1359. repl_root = ldap_utf8strtok_r(iter, ":", &iter);
  1360. certify_all = ldap_utf8strtok_r(iter, ":", &iter);
  1361. if (!is_cleaned_rid(rid) || !is_pre_cleaned_rid(rid) || is_task_aborted(rid)) {
  1362. /* This replica has already been aborted, or was never cleaned, or already finished cleaning */
  1363. goto out;
  1364. } else {
  1365. slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name, "multisupplier_extop_abort_cleanruv - "
  1366. "Abort CleanAllRUV Task - Aborting cleanallruv task for rid(%d)\n",
  1367. rid);
  1368. }
  1369. /*
  1370. * Get the replica
  1371. */
  1372. if ((r = replica_get_replica_from_root(repl_root)) == NULL) {
  1373. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multisupplier_extop_abort_cleanruv - "
  1374. "Abort CleanAllRUV Task - Replica is NULL, aborting task\n");
  1375. rc = LDAP_OPERATIONS_ERROR;
  1376. goto out;
  1377. }
  1378. if (check_and_set_abort_cleanruv_task_count() != LDAP_SUCCESS) {
  1379. cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR,
  1380. "Exceeded maximum number of active abort CLEANALLRUV tasks(%d)", CLEANRIDSIZ);
  1381. rc = LDAP_UNWILLING_TO_PERFORM;
  1382. goto out;
  1383. }
  1384. /*
  1385. * Prepare the abort data
  1386. */
  1387. data = (cleanruv_data *)slapi_ch_calloc(1, sizeof(cleanruv_data));
  1388. if (data == NULL) {
  1389. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "multisupplier_extop_abort_cleanruv - "
  1390. "Abort CleanAllRUV Task - Failed to allocate "
  1391. "abort_cleanruv_data. Aborting task.\n");
  1392. rc = LDAP_OPERATIONS_ERROR;
  1393. goto out;
  1394. }
  1395. data->replica = r;
  1396. data->task = NULL;
  1397. data->payload = slapi_ch_bvdup(extop_payload);
  1398. data->rid = rid;
  1399. data->repl_root = slapi_ch_strdup(repl_root);
  1400. data->certify = slapi_ch_strdup(certify_all);
  1401. data->original_task = PR_FALSE;
  1402. /*
  1403. * Set the aborted rid and stop the cleaning
  1404. */
  1405. add_aborted_rid(rid, r, repl_root, data->certify, data->original_task);
  1406. stop_ruv_cleaning();
  1407. /*
  1408. * Send out the extended ops to the replicas
  1409. */
  1410. thread = PR_CreateThread(PR_USER_THREAD, replica_abort_task_thread,
  1411. (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  1412. PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE);
  1413. if (thread == NULL) {
  1414. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "multisupplier_extop_abort_cleanruv - "
  1415. "Abort CleanAllRUV Task - Unable to create abort "
  1416. "thread. Aborting task.\n");
  1417. slapi_ch_free_string(&data->repl_root);
  1418. slapi_ch_free_string(&data->certify);
  1419. ber_bvfree(data->payload);
  1420. slapi_ch_free((void **)&data);
  1421. rc = LDAP_OPERATIONS_ERROR;
  1422. }
  1423. out:
  1424. slapi_ch_free_string(&payload);
  1425. return rc;
  1426. }
  1427. /*
  1428. * Process the REPL_CLEANRUV_OID extended operation.
  1429. *
  1430. * The payload consists of the replica ID, repl root dn, and the maxcsn. Since this is
  1431. * basically a replication operation, it could of originated here and bounced
  1432. * back from another supplier. So check the rid against the "cleaned_rid". If
  1433. * it's a match, then we were already here, and we can just return success.
  1434. *
  1435. * Otherwise, we the set the cleaned_rid from the payload, fire off extended ops
  1436. * to all the replica agreements on this replica. Then perform the actual
  1437. * cleanruv_task on this replica.
  1438. */
  1439. int
  1440. multisupplier_extop_cleanruv(Slapi_PBlock *pb)
  1441. {
  1442. PRThread *thread = NULL;
  1443. Replica *replica = NULL;
  1444. cleanruv_data *data = NULL;
  1445. CSN *maxcsn = NULL;
  1446. struct berval *extop_payload;
  1447. struct berval *resp_bval = NULL;
  1448. BerElement *resp_bere = NULL;
  1449. char *payload = NULL;
  1450. char *csnstr = NULL;
  1451. char *force = NULL;
  1452. char *extop_oid;
  1453. char *repl_root;
  1454. char *iter = NULL;
  1455. int rid = 0;
  1456. int rc = LDAP_OPERATIONS_ERROR;
  1457. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid);
  1458. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_payload);
  1459. if (NULL == extop_oid || strcmp(extop_oid, REPL_CLEANRUV_OID) != 0 ||
  1460. NULL == extop_payload || NULL == extop_payload->bv_val) {
  1461. /* something is wrong, error out */
  1462. goto free_and_return;
  1463. }
  1464. /*
  1465. * Decode the payload
  1466. */
  1467. if (decode_cleanruv_payload(extop_payload, &payload)) {
  1468. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multisupplier_extop_cleanruv - CleanAllRUV Task - Failed to decode payload. Aborting ext op\n");
  1469. goto free_and_return;
  1470. }
  1471. rid = atoi(ldap_utf8strtok_r(payload, ":", &iter));
  1472. repl_root = ldap_utf8strtok_r(iter, ":", &iter);
  1473. csnstr = ldap_utf8strtok_r(iter, ":", &iter);
  1474. force = ldap_utf8strtok_r(iter, ":", &iter);
  1475. if (force == NULL) {
  1476. force = "no";
  1477. }
  1478. maxcsn = csn_new();
  1479. csn_init_by_string(maxcsn, csnstr);
  1480. /*
  1481. * If we already cleaned this server, just return success
  1482. */
  1483. if (is_cleaned_rid(rid) || is_pre_cleaned_rid(rid) || is_task_aborted(rid)) {
  1484. csn_free(&maxcsn);
  1485. rc = LDAP_SUCCESS;
  1486. goto free_and_return;
  1487. }
  1488. /*
  1489. * Get the node, so we can get the replica and its agreements
  1490. */
  1491. replica = replica_get_replica_from_root(repl_root);
  1492. if (replica == NULL) {
  1493. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multisupplier_extop_cleanruv - CleanAllRUV Task - Replica is NULL, aborting task\n");
  1494. goto free_and_return;
  1495. }
  1496. if (check_and_set_cleanruv_task_count((ReplicaId)rid) != LDAP_SUCCESS) {
  1497. cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR,
  1498. "Exceeded maximum number of active CLEANALLRUV tasks(%d)", CLEANRIDSIZ);
  1499. rc = LDAP_UNWILLING_TO_PERFORM;
  1500. goto free_and_return;
  1501. }
  1502. if (replica_get_type(replica) != REPLICA_TYPE_READONLY) {
  1503. /*
  1504. * Launch the cleanruv monitoring thread. Once all the replicas are cleaned it will release the rid
  1505. */
  1506. cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Launching cleanAllRUV thread...");
  1507. data = (cleanruv_data *)slapi_ch_calloc(1, sizeof(cleanruv_data));
  1508. if (data == NULL) {
  1509. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multisupplier_extop_cleanruv - CleanAllRUV Task - Failed to allocate "
  1510. "cleanruv_Data\n");
  1511. goto free_and_return;
  1512. }
  1513. data->replica = replica;
  1514. data->rid = rid;
  1515. data->task = NULL;
  1516. data->maxcsn = maxcsn;
  1517. data->payload = slapi_ch_bvdup(extop_payload);
  1518. data->force = slapi_ch_strdup(force);
  1519. data->repl_root = slapi_ch_strdup(repl_root);
  1520. data->original_task = PR_FALSE;
  1521. thread = PR_CreateThread(PR_USER_THREAD, replica_cleanallruv_thread_ext,
  1522. (void *)data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  1523. PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE);
  1524. if (thread == NULL) {
  1525. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multisupplier_extop_cleanruv - CleanAllRUV Task - Unable to create cleanAllRUV "
  1526. "monitoring thread. Aborting task.\n");
  1527. ber_bvfree(data->payload);
  1528. data->payload = NULL;
  1529. slapi_ch_free_string(&data->force);
  1530. slapi_ch_free_string(&data->repl_root);
  1531. slapi_ch_free((void **)&data);
  1532. } else {
  1533. maxcsn = NULL; /* thread owns it now */
  1534. rc = LDAP_SUCCESS;
  1535. }
  1536. } else { /* this is a read-only consumer */
  1537. /*
  1538. * wait for the maxcsn to be covered
  1539. */
  1540. Object *ruv_obj;
  1541. const RUV *ruv;
  1542. ruv_obj = replica_get_ruv(replica);
  1543. ruv = object_get_data(ruv_obj);
  1544. while (!is_task_aborted(rid) && !slapi_is_shutting_down()) {
  1545. if (!ruv_contains_replica(ruv, rid)) {
  1546. /* we've already been cleaned */
  1547. break;
  1548. }
  1549. slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name, "multisupplier_extop_cleanruv - CleanAllRUV Task - Checking if we're caught up...\n");
  1550. if (ruv_covers_csn_cleanallruv(ruv, maxcsn) || csn_get_replicaid(maxcsn) == 0 || strcmp(force, "yes") == 0) {
  1551. /* We are caught up */
  1552. break;
  1553. } else {
  1554. char csn_str[CSN_STRSIZE];
  1555. csn_as_string(maxcsn, PR_FALSE, csn_str);
  1556. slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "multisupplier_extop_cleanruv - CleanAllRUV Task - Not ruv caught up maxcsn(%s)\n", csnstr);
  1557. }
  1558. DS_Sleep(PR_SecondsToInterval(5));
  1559. }
  1560. slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name, "multisupplier_extop_cleanruv - CleanAllRUV Task - We're caught up...\n");
  1561. /*
  1562. * Set cleaned rid in memory only - does not survive a server restart
  1563. */
  1564. set_cleaned_rid(rid);
  1565. /*
  1566. * Clean the ruv
  1567. */
  1568. replica_execute_cleanruv_task_ext(replica, rid);
  1569. /* free everything */
  1570. object_release(ruv_obj);
  1571. /*
  1572. * This read-only replica has no easy way to tell when it's safe to release the rid.
  1573. * So we won't release it, not until a server restart.
  1574. */
  1575. slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name, "multisupplier_extop_cleanruv - CleanAllRUV Task - You must restart the server if you want to reuse rid(%d).\n", rid);
  1576. slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name, "multisupplier_extop_cleanruv - CleanAllRUV Task - Successfully cleaned rid(%d).\n", rid);
  1577. rc = LDAP_SUCCESS;
  1578. }
  1579. free_and_return:
  1580. csn_free(&maxcsn);
  1581. slapi_ch_free_string(&payload);
  1582. /*
  1583. * Craft a message so we know this replica supports the task
  1584. */
  1585. if ((resp_bere = der_alloc())) {
  1586. ber_printf(resp_bere, "{s}", CLEANRUV_ACCEPTED);
  1587. ber_flatten(resp_bere, &resp_bval);
  1588. slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval);
  1589. slapi_send_ldap_result(pb, rc, NULL, NULL, 0, NULL);
  1590. /* resp_bere */
  1591. if (NULL != resp_bere) {
  1592. ber_free(resp_bere, 1);
  1593. }
  1594. /* resp_bval */
  1595. if (NULL != resp_bval) {
  1596. ber_bvfree(resp_bval);
  1597. }
  1598. /* tell extendop code that we have already sent the result */
  1599. rc = SLAPI_PLUGIN_EXTENDED_SENT_RESULT;
  1600. } else {
  1601. rc = LDAP_OPERATIONS_ERROR;
  1602. }
  1603. return rc;
  1604. }
  1605. /*
  1606. * Get the max csn for the designated repl area
  1607. */
  1608. int
  1609. multisupplier_extop_cleanruv_get_maxcsn(Slapi_PBlock *pb)
  1610. {
  1611. struct berval *resp_bval = NULL;
  1612. struct berval *extop_payload;
  1613. BerElement *resp_bere = NULL;
  1614. char *extop_oid = NULL;
  1615. char *base_dn = NULL;
  1616. char *payload = NULL;
  1617. char *maxcsn = NULL;
  1618. char *iter = NULL;
  1619. int rid = 0;
  1620. int rc = LDAP_OPERATIONS_ERROR;
  1621. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid);
  1622. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_payload);
  1623. if (NULL == extop_oid || strcmp(extop_oid, REPL_CLEANRUV_GET_MAXCSN_OID) != 0 ||
  1624. NULL == extop_payload || NULL == extop_payload->bv_val) {
  1625. /* something is wrong, error out */
  1626. goto free_and_return;
  1627. }
  1628. /*
  1629. * Decode the payload
  1630. */
  1631. if (decode_cleanruv_payload(extop_payload, &payload)) {
  1632. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multisupplier_extop_cleanruv_get_maxcsn - CleanAllRUV Task - Get MaxCSN Task: failed to decode payload. Aborting ext op\n");
  1633. goto free_and_return;
  1634. }
  1635. rid = atoi(ldap_utf8strtok_r(payload, ":", &iter));
  1636. base_dn = ldap_utf8strtok_r(iter, ":", &iter);
  1637. maxcsn = replica_cleanallruv_get_local_maxcsn(rid, base_dn);
  1638. if (maxcsn == NULL) {
  1639. maxcsn = slapi_ch_strdup(CLEANRUV_NO_MAXCSN);
  1640. }
  1641. /*
  1642. * Send the extended op response
  1643. */
  1644. if ((resp_bere = der_alloc())) {
  1645. ber_printf(resp_bere, "{s}", maxcsn);
  1646. ber_flatten(resp_bere, &resp_bval);
  1647. slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval);
  1648. slapi_send_ldap_result(pb, LDAP_SUCCESS, NULL, NULL, 0, NULL);
  1649. /* resp_bere */
  1650. if (NULL != resp_bere) {
  1651. ber_free(resp_bere, 1);
  1652. }
  1653. /* resp_bval */
  1654. if (NULL != resp_bval) {
  1655. ber_bvfree(resp_bval);
  1656. }
  1657. /* tell extendop code that we have already sent the result */
  1658. rc = SLAPI_PLUGIN_EXTENDED_SENT_RESULT;
  1659. } else {
  1660. rc = LDAP_OPERATIONS_ERROR;
  1661. }
  1662. free_and_return:
  1663. slapi_ch_free_string(&payload);
  1664. slapi_ch_free_string(&maxcsn);
  1665. return rc;
  1666. }
  1667. /*
  1668. * Search cn=config for the cleanallruv attributes (clean & abort)
  1669. */
  1670. int
  1671. multisupplier_extop_cleanruv_check_status(Slapi_PBlock *pb)
  1672. {
  1673. Slapi_PBlock *search_pb = NULL;
  1674. Slapi_Entry **entries = NULL;
  1675. struct berval *resp_bval = NULL;
  1676. struct berval *extop_payload;
  1677. BerElement *resp_bere = NULL;
  1678. char *response = NULL;
  1679. char *filter = NULL;
  1680. char *extop_oid;
  1681. int res = 0;
  1682. int rc = LDAP_OPERATIONS_ERROR;
  1683. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid);
  1684. slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_payload);
  1685. if (NULL == extop_oid || strcmp(extop_oid, REPL_CLEANRUV_CHECK_STATUS_OID) != 0 ||
  1686. NULL == extop_payload || NULL == extop_payload->bv_val) {
  1687. /* something is wrong, error out */
  1688. goto free_and_return;
  1689. }
  1690. /*
  1691. * Decode the payload - which should just be a filter
  1692. */
  1693. if (decode_cleanruv_payload(extop_payload, &filter)) {
  1694. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multisupplier_extop_cleanruv_check_status - "
  1695. "CleanAllRUV Task - Check Status Task: failed to decode payload. Aborting ext op\n");
  1696. goto free_and_return;
  1697. }
  1698. search_pb = slapi_pblock_new();
  1699. slapi_search_internal_set_pb(search_pb, "cn=config", LDAP_SCOPE_SUBTREE,
  1700. filter, NULL, 0, NULL, NULL, repl_get_plugin_identity(PLUGIN_MULTISUPPLIER_REPLICATION), 0);
  1701. slapi_search_internal_pb(search_pb);
  1702. slapi_pblock_get(search_pb, SLAPI_PLUGIN_INTOP_RESULT, &res);
  1703. if (LDAP_SUCCESS == res) {
  1704. slapi_pblock_get(search_pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries);
  1705. if (NULL == entries || entries[0] == NULL) {
  1706. /* cleaning task has finished, send repsonse */
  1707. response = CLEANRUV_FINISHED;
  1708. } else {
  1709. response = CLEANRUV_CLEANING;
  1710. }
  1711. /*
  1712. * Send the extended op response
  1713. */
  1714. if ((resp_bere = der_alloc())) {
  1715. ber_printf(resp_bere, "{s}", response);
  1716. ber_flatten(resp_bere, &resp_bval);
  1717. slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval);
  1718. slapi_send_ldap_result(pb, LDAP_SUCCESS, NULL, NULL, 0, NULL);
  1719. /* resp_bere */
  1720. if (NULL != resp_bere) {
  1721. ber_free(resp_bere, 1);
  1722. }
  1723. /* resp_bval */
  1724. if (NULL != resp_bval) {
  1725. ber_bvfree(resp_bval);
  1726. }
  1727. /* tell extendop code that we have already sent the result */
  1728. rc = SLAPI_PLUGIN_EXTENDED_SENT_RESULT;
  1729. }
  1730. }
  1731. free_and_return:
  1732. slapi_free_search_results_internal(search_pb);
  1733. slapi_pblock_destroy(search_pb);
  1734. slapi_ch_free_string(&filter);
  1735. return rc;
  1736. }
  1737. /*
  1738. * This plugin entry point is a noop entry
  1739. * point. It's used when registering extops that
  1740. * are only used as responses. We'll never receive
  1741. * one of those, unsolicited, but we still want to
  1742. * register them so they appear in the
  1743. * supportedextension attribute in the root DSE.
  1744. */
  1745. int
  1746. extop_noop(Slapi_PBlock *pb __attribute__((unused)))
  1747. {
  1748. return SLAPI_PLUGIN_EXTENDED_NOT_HANDLED;
  1749. }
  1750. static int
  1751. check_replica_id_uniqueness(Replica *replica, RUV *supplier_ruv)
  1752. {
  1753. ReplicaId local_rid = replica_get_rid(replica);
  1754. ReplicaId sup_rid = 0;
  1755. char *sup_purl = NULL;
  1756. if (ruv_get_first_id_and_purl(supplier_ruv, &sup_rid, &sup_purl) == RUV_SUCCESS) {
  1757. /* ReplicaID Uniqueness is checked only on Suppliers/Providers */
  1758. if ((replica_get_type(replica) == REPLICA_TYPE_UPDATABLE) &&
  1759. (sup_rid == local_rid)) {
  1760. return 1;
  1761. }
  1762. }
  1763. return 0;
  1764. }