repl_connext.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. /** BEGIN COPYRIGHT BLOCK
  2. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  3. * Copyright (C) 2005 Red Hat, Inc.
  4. * All rights reserved.
  5. *
  6. * License: GPL (version 3 or any later version).
  7. * See LICENSE for details.
  8. * END COPYRIGHT BLOCK **/
  9. #ifdef HAVE_CONFIG_H
  10. #include <config.h>
  11. #endif
  12. /* repl_connext.c - replication extension to the Connection object
  13. */
  14. #include "repl5.h"
  15. /* ***** Supplier side ***** */
  16. /* NOT NEEDED YET */
  17. /* ***** Consumer side ***** */
  18. /* consumer connection extension constructor */
  19. void *
  20. consumer_connection_extension_constructor(void *object __attribute__((unused)), void *parent __attribute__((unused)))
  21. {
  22. consumer_connection_extension *ext = (consumer_connection_extension *)slapi_ch_malloc(sizeof(consumer_connection_extension));
  23. if (ext == NULL) {
  24. slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name, "consumer_connection_extension_constructor - "
  25. "Unable to create replication consumer connection extension - out of memory\n");
  26. } else {
  27. ext->repl_protocol_version = REPL_PROTOCOL_UNKNOWN;
  28. ext->replica_acquired = NULL;
  29. ext->isreplicationsession = 0;
  30. ext->supplier_ruv = NULL;
  31. ext->connection = NULL;
  32. ext->in_use_opid = -1;
  33. ext->lock = PR_NewLock();
  34. if (NULL == ext->lock) {
  35. slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name, "consumer_connection_extension_constructor - "
  36. "Unable to create replication consumer connection extension lock - out of memory\n");
  37. /* no need to go through the full destructor, but still need to free up this memory */
  38. slapi_ch_free((void **)&ext);
  39. ext = NULL;
  40. }
  41. }
  42. return ext;
  43. }
  44. /* consumer connection extension destructor */
  45. void
  46. consumer_connection_extension_destructor(void *ext, void *object __attribute__((unused)), void *parent __attribute__((unused)))
  47. {
  48. PRUint64 connid = 0;
  49. if (ext) {
  50. /* Check to see if this replication session has acquired
  51. * a replica. If so, release it here.
  52. */
  53. consumer_connection_extension *connext = (consumer_connection_extension *)ext;
  54. if (NULL != connext->replica_acquired) {
  55. Replica *r = object_get_data((Object *)connext->replica_acquired);
  56. /* If a total update was in progress, abort it */
  57. if (REPL_PROTOCOL_50_TOTALUPDATE == connext->repl_protocol_version) {
  58. Slapi_PBlock *pb = slapi_pblock_new();
  59. const Slapi_DN *repl_root_sdn = replica_get_root(r);
  60. PR_ASSERT(NULL != repl_root_sdn);
  61. if (NULL != repl_root_sdn) {
  62. slapi_pblock_set(pb, SLAPI_CONNECTION, connext->connection);
  63. slapi_pblock_set(pb, SLAPI_TARGET_SDN, (void *)repl_root_sdn);
  64. slapi_pblock_get(pb, SLAPI_CONN_ID, &connid);
  65. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  66. "consumer_connection_extension_destructor - "
  67. "Aborting total update in progress for replicated "
  68. "area %s connid=%" PRIu64 "\n",
  69. slapi_sdn_get_dn(repl_root_sdn), connid);
  70. slapi_stop_bulk_import(pb);
  71. } else {
  72. slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name,
  73. "consumer_connection_extension_destructor - Can't determine root "
  74. "of replicated area.\n");
  75. }
  76. slapi_pblock_destroy(pb);
  77. /* allow reaping again */
  78. replica_set_tombstone_reap_stop(r, PR_FALSE);
  79. }
  80. replica_relinquish_exclusive_access(r, connid, -1);
  81. object_release((Object *)connext->replica_acquired);
  82. connext->replica_acquired = NULL;
  83. }
  84. if (connext->supplier_ruv) {
  85. ruv_destroy((RUV **)&connext->supplier_ruv);
  86. }
  87. if (connext->lock) {
  88. PR_DestroyLock(connext->lock);
  89. connext->lock = NULL;
  90. }
  91. connext->in_use_opid = -1;
  92. connext->connection = NULL;
  93. slapi_ch_free((void **)&ext);
  94. }
  95. }
  96. /* Obtain exclusive access to this connection extension.
  97. * Returns the consumer_connection_extension* if successful, else NULL.
  98. *
  99. * This is similar to obtaining exclusive access to the replica, but not identical.
  100. * For the connection extension, you only want to hold on to exclusive access as
  101. * long as it is being actively used to process an operation. Mainly that means
  102. * while processing either a 'start' or an 'end' extended operation. This makes
  103. * certain that if another 'start' or 'end' operation is received on the connection,
  104. * the ops will not trample on each other's state. As soon as you are done with
  105. * that single operation, it is time to relinquish the connection extension.
  106. * That differs from acquiring exclusive access to the replica, which is held over
  107. * after the 'start' operation and relinquished during the 'end' operation.
  108. */
  109. consumer_connection_extension *
  110. consumer_connection_extension_acquire_exclusive_access(void *conn, PRUint64 connid, int opid)
  111. {
  112. consumer_connection_extension *ret = NULL;
  113. /* step 1, grab the connext */
  114. consumer_connection_extension *connext = (consumer_connection_extension *)
  115. repl_con_get_ext(REPL_CON_EXT_CONN, conn);
  116. if (NULL != connext) {
  117. /* step 2, acquire its lock */
  118. PR_Lock(connext->lock);
  119. /* step 3, see if it is not in use, or in use by us */
  120. if (0 > connext->in_use_opid) {
  121. /* step 4, take it! */
  122. connext->in_use_opid = opid;
  123. ret = connext;
  124. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  125. "consumer_connection_extension_acquire_exclusive_access - "
  126. "conn=%" PRIu64 " op=%d Acquired consumer connection extension\n",
  127. connid, opid);
  128. } else if (opid == connext->in_use_opid) {
  129. ret = connext;
  130. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  131. "consumer_connection_extension_acquire_exclusive_access - "
  132. "conn=%" PRIu64 " op=%d Reacquired consumer connection extension\n",
  133. connid, opid);
  134. } else {
  135. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  136. "consumer_connection_extension_acquire_exclusive_access - "
  137. "conn=%" PRIu64 " op=%d Could not acquire consumer connection extension; it is in use by op=%d\n",
  138. connid, opid, connext->in_use_opid);
  139. }
  140. /* step 5, drop the lock */
  141. PR_Unlock(connext->lock);
  142. } else {
  143. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  144. "consumer_connection_extension_acquire_exclusive_access - "
  145. "conn=%" PRIu64 " op=%d Could not acquire consumer extension, it is NULL!\n",
  146. connid, opid);
  147. }
  148. return ret;
  149. }
  150. /* Relinquish exclusive access to this connection extension.
  151. * Returns 0 if exclusive access could NOT be relinquished, and non-zero if it was.
  152. * Specifically:
  153. * 1 if the extension was in use and was relinquished.
  154. * 2 if the extension was not in use to begin with.
  155. *
  156. * The extension will only be relinquished if it was first acquired by this op,
  157. * or if 'force' is TRUE. Do not use 'force' without a legitimate reason, such
  158. * as when destroying the parent connection.
  159. *
  160. * cf. consumer_connection_extension_acquire_exclusive_access() for details on how,
  161. * when, and why you would want to acquire and relinquish exclusive access.
  162. */
  163. int
  164. consumer_connection_extension_relinquish_exclusive_access(void *conn, PRUint64 connid, int opid, PRBool force)
  165. {
  166. int ret = 0;
  167. /* step 1, grab the connext */
  168. consumer_connection_extension *connext = (consumer_connection_extension *)
  169. repl_con_get_ext(REPL_CON_EXT_CONN, conn);
  170. if (NULL != connext) {
  171. /* step 2, acquire its lock */
  172. PR_Lock(connext->lock);
  173. /* step 3, see if it is in use */
  174. if (0 > connext->in_use_opid) {
  175. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  176. "consumer_connection_extension_relinquish_exclusive_access - "
  177. "conn=%" PRIu64 " op=%d Consumer connection extension is not in use\n",
  178. connid, opid);
  179. ret = 2;
  180. } else if (opid == connext->in_use_opid) {
  181. /* step 4, relinquish it (normal) */
  182. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  183. "consumer_connection_extension_relinquish_exclusive_access - "
  184. "conn=%" PRIu64 " op=%d Relinquishing consumer connection extension\n",
  185. connid, opid);
  186. connext->in_use_opid = -1;
  187. ret = 1;
  188. } else if (force) {
  189. /* step 4, relinquish it (forced) */
  190. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  191. "consumer_connection_extension_relinquish_exclusive_access - "
  192. "conn=%" PRIu64 " op=%d Forced to relinquish consumer connection extension held by op=%d\n",
  193. connid, opid, connext->in_use_opid);
  194. connext->in_use_opid = -1;
  195. ret = 1;
  196. } else {
  197. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  198. "consumer_connection_extension_relinquish_exclusive_access - "
  199. "conn=%" PRIu64 " op=%d Not relinquishing consumer connection extension, it is held by op=%d!\n",
  200. connid, opid, connext->in_use_opid);
  201. }
  202. /* step 5, drop the lock */
  203. PR_Unlock(connext->lock);
  204. } else {
  205. slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
  206. "consumer_connection_extension_relinquish_exclusive_access - "
  207. "conn=%" PRIu64 " op=%d Could not relinquish consumer extension, it is NULL!\n",
  208. connid, opid);
  209. }
  210. return ret;
  211. }