connection.c 80 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618
  1. /** BEGIN COPYRIGHT BLOCK
  2. * This Program is free software; you can redistribute it and/or modify it under
  3. * the terms of the GNU General Public License as published by the Free Software
  4. * Foundation; version 2 of the License.
  5. *
  6. * This Program is distributed in the hope that it will be useful, but WITHOUT
  7. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  8. * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  9. *
  10. * You should have received a copy of the GNU General Public License along with
  11. * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple
  12. * Place, Suite 330, Boston, MA 02111-1307 USA.
  13. *
  14. * In addition, as a special exception, Red Hat, Inc. gives You the additional
  15. * right to link the code of this Program with code not covered under the GNU
  16. * General Public License ("Non-GPL Code") and to distribute linked combinations
  17. * including the two, subject to the limitations in this paragraph. Non-GPL Code
  18. * permitted under this exception must only link to the code of this Program
  19. * through those well defined interfaces identified in the file named EXCEPTION
  20. * found in the source code files (the "Approved Interfaces"). The files of
  21. * Non-GPL Code may instantiate templates or use macros or inline functions from
  22. * the Approved Interfaces without causing the resulting work to be covered by
  23. * the GNU General Public License. Only Red Hat, Inc. may make changes or
  24. * additions to the list of Approved Interfaces. You must obey the GNU General
  25. * Public License in all respects for all of the Program code and other code used
  26. * in conjunction with the Program except the Non-GPL Code covered by this
  27. * exception. If you modify this file, you may extend this exception to your
  28. * version of the file, but you are not obligated to do so. If you do not wish to
  29. * provide this exception without modification, you must delete this exception
  30. * statement from your version and license this file solely under the GPL without
  31. * exception.
  32. *
  33. *
  34. * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
  35. * Copyright (C) 2005 Red Hat, Inc.
  36. * All rights reserved.
  37. * END COPYRIGHT BLOCK **/
  38. #ifdef HAVE_CONFIG_H
  39. # include <config.h>
  40. #endif
  41. #include <stdio.h>
  42. #include <string.h>
  43. #include <sys/types.h>
  44. #ifndef _WIN32
  45. #include <sys/time.h>
  46. #include <sys/socket.h>
  47. #include <stdlib.h>
  48. #endif
  49. #include <signal.h>
  50. #include "slap.h"
  51. #include "prcvar.h"
  52. #include "prlog.h" /* for PR_ASSERT */
  53. #include "fe.h"
  54. #include <sasl.h>
  55. #if defined(LINUX)
  56. #include <netinet/tcp.h> /* for TCP_CORK */
  57. #endif
  58. static void connection_threadmain( void );
  59. static void add_pb( Slapi_PBlock * );
  60. static Slapi_PBlock *get_pb( void );
  61. static void connection_add_operation(Connection* conn, Operation *op);
  62. static void connection_free_private_buffer(Connection *conn);
  63. static void op_copy_identity(Connection *conn, Operation *op);
  64. static int is_ber_too_big(const Connection *conn, ber_len_t ber_len);
  65. static void log_ber_too_big_error(const Connection *conn,
  66. ber_len_t ber_len, ber_len_t maxbersize);
  67. /*
  68. * We maintain a global work queue of Slapi_PBlock's that have not yet
  69. * been handed off to an operation thread.
  70. */
  71. struct Slapi_PBlock_q
  72. {
  73. Slapi_PBlock *pb;
  74. struct Slapi_PBlock_q *next_pb;
  75. int pb_fd;
  76. };
  77. static struct Slapi_PBlock_q *first_pb= NULL; /* global work queue head */
  78. static struct Slapi_PBlock_q *last_pb= NULL; /* global work queue tail */
  79. static PRLock *pb_q_lock=NULL; /* protects first_pb & last_pb */
  80. static PRCondVar *op_thread_cv; /* used by operation threads to wait for work */
  81. static PRLock *op_thread_lock; /* associated with op_thread_cv */
  82. static int op_shutdown= 0; /* if non-zero, server is shutting down */
  83. #define LDAP_SOCKET_IO_BUFFER_SIZE 512 /* Size of the buffer we give to the I/O system for reads */
  84. /*
  85. * We really are done with this connection. Get rid of everything.
  86. *
  87. * Note: this function should be called with conn->c_mutex already locked
  88. * or at a time when multiple threads are not in play that might touch the
  89. * connection structure.
  90. */
  91. void
  92. connection_done(Connection *conn)
  93. {
  94. connection_cleanup(conn);
  95. /* free the private content, the buffer has been freed by above connection_cleanup */
  96. slapi_ch_free((void**)&conn->c_private);
  97. if (NULL != conn->c_sb)
  98. {
  99. ber_sockbuf_free(conn->c_sb);
  100. }
  101. if (NULL != conn->c_mutex)
  102. {
  103. PR_DestroyLock(conn->c_mutex);
  104. }
  105. if (NULL != conn->c_pdumutex)
  106. {
  107. PR_DestroyLock(conn->c_pdumutex);
  108. }
  109. }
  110. /*
  111. * We're going to be making use of this connection again.
  112. * So, get rid of everything we can't make use of.
  113. *
  114. * Note: this function should be called with conn->c_mutex already locked
  115. * or at a time when multiple threads are not in play that might touch the
  116. * connection structure.
  117. */
  118. void
  119. connection_cleanup(Connection *conn)
  120. {
  121. bind_credentials_clear( conn, PR_FALSE /* do not lock conn */,
  122. PR_TRUE /* clear external creds. */ );
  123. slapi_ch_free((void**)&conn->c_authtype);
  124. /* Call the plugin extension destructors */
  125. factory_destroy_extension(connection_type,conn,NULL/*Parent*/,&(conn->c_extension));
  126. /*
  127. * We hang onto these, since we can reuse them.
  128. * Sockbuf *c_sb;
  129. * PRLock *c_mutex;
  130. * PRLock *c_pdumutex;
  131. * Conn_private *c_private;
  132. */
  133. #ifdef _WIN32
  134. if (conn->c_prfd && (conn->c_flags & CONN_FLAG_SSL))
  135. {
  136. LDAPDebug( LDAP_DEBUG_CONNS,
  137. "conn=%" PRIu64 " fd=%d closed now\n",
  138. conn->c_connid, conn->c_sd,0);
  139. PR_Close(conn->c_prfd);
  140. }
  141. else if (conn->c_sd)
  142. {
  143. LDAPDebug( LDAP_DEBUG_CONNS,
  144. "conn=%" PRIu64 " fd=%d closed now\n",
  145. conn->c_connid, conn->c_sd,0);
  146. closesocket(conn->c_sd);
  147. }
  148. #else
  149. if (conn->c_prfd)
  150. {
  151. PR_Close(conn->c_prfd);
  152. }
  153. #endif
  154. conn->c_sd= SLAPD_INVALID_SOCKET;
  155. conn->c_ldapversion= 0;
  156. conn->c_isreplication_session = 0;
  157. slapi_ch_free((void**)&conn->cin_addr );
  158. slapi_ch_free((void**)&conn->cin_destaddr );
  159. if ( conn->c_domain != NULL )
  160. {
  161. ber_bvecfree( conn->c_domain );
  162. conn->c_domain = NULL;
  163. }
  164. /* conn->c_ops= NULL; */
  165. conn->c_gettingber= 0;
  166. conn->c_currentber= NULL;
  167. conn->c_starttime= 0;
  168. conn->c_connid= 0;
  169. conn->c_opsinitiated= 0;
  170. conn->c_opscompleted= 0;
  171. conn->c_threadnumber= 0;
  172. conn->c_refcnt= 0;
  173. conn->c_idlesince= 0;
  174. conn->c_flags= 0;
  175. conn->c_needpw= 0;
  176. conn->c_prfd= NULL;
  177. /* c_ci stays as it is */
  178. conn->c_fdi= SLAPD_INVALID_SOCKET_INDEX;
  179. conn->c_next= NULL;
  180. conn->c_prev= NULL;
  181. conn->c_extension= NULL;
  182. /* remove any SASL I/O from the connection */
  183. sasl_io_cleanup(conn);
  184. sasl_dispose((sasl_conn_t**)&conn->c_sasl_conn);
  185. /* free the connection socket buffer */
  186. connection_free_private_buffer(conn);
  187. }
  188. /*
  189. * Callers of connection_reset() must hold the conn->c_mutex lock.
  190. */
  191. void
  192. connection_reset(Connection* conn, int ns, PRNetAddr * from, int fromLen, int is_SSL)
  193. {
  194. char * pTmp = is_SSL ? "SSL " : "";
  195. char *str_ip = NULL, *str_destip;
  196. char buf_ip[ 256 ], buf_destip[ 256 ];
  197. char *str_unknown = "unknown";
  198. int in_referral_mode = config_check_referral_mode();
  199. LDAPDebug( LDAP_DEBUG_CONNS, "new %sconnection on %d\n", pTmp, conn->c_sd, 0 );
  200. /* bump our count of connections and update SNMP stats */
  201. conn->c_connid = slapi_counter_increment(num_conns);
  202. if (! in_referral_mode) {
  203. slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsConnectionSeq);
  204. slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsConnections);
  205. }
  206. /*
  207. * get peer address (IP address of this client)
  208. */
  209. slapi_ch_free( (void**)&conn->cin_addr ); /* just to be conservative */
  210. if ( ((from->ipv6.ip.pr_s6_addr32[0] != 0) || /* from contains non zeros */
  211. (from->ipv6.ip.pr_s6_addr32[1] != 0) ||
  212. (from->ipv6.ip.pr_s6_addr32[2] != 0) ||
  213. (from->ipv6.ip.pr_s6_addr32[3] != 0)) ||
  214. ((conn->c_prfd != NULL) && (PR_GetPeerName( conn->c_prfd, from ) == 0)) ) {
  215. conn->cin_addr = (PRNetAddr *) slapi_ch_malloc( sizeof( PRNetAddr ) );
  216. memcpy( conn->cin_addr, from, sizeof( PRNetAddr ) );
  217. if ( PR_IsNetAddrType( conn->cin_addr, PR_IpAddrV4Mapped ) ) {
  218. PRNetAddr v4addr;
  219. memset( &v4addr, 0, sizeof( v4addr ) );
  220. v4addr.inet.family = PR_AF_INET;
  221. v4addr.inet.ip = conn->cin_addr->ipv6.ip.pr_s6_addr32[3];
  222. PR_NetAddrToString( &v4addr, buf_ip, sizeof( buf_ip ) );
  223. } else {
  224. PR_NetAddrToString( conn->cin_addr, buf_ip, sizeof( buf_ip ) );
  225. }
  226. buf_ip[ sizeof( buf_ip ) - 1 ] = '\0';
  227. str_ip = buf_ip;
  228. } else {
  229. /* try syscall since "from" was not given and PR_GetPeerName failed */
  230. /* a corner case */
  231. struct sockaddr_in addr; /* assuming IPv4 */
  232. #if ( defined( hpux ) )
  233. int addrlen;
  234. #else
  235. socklen_t addrlen;
  236. #endif
  237. addrlen = sizeof( addr );
  238. memset( &addr, 0, addrlen );
  239. if ( (conn->c_prfd == NULL) &&
  240. (getpeername( conn->c_sd, (struct sockaddr *)&addr, &addrlen )
  241. == 0) ) {
  242. conn->cin_addr = (PRNetAddr *)slapi_ch_malloc( sizeof( PRNetAddr ));
  243. memset( conn->cin_addr, 0, sizeof( PRNetAddr ) );
  244. PR_NetAddrFamily( conn->cin_addr ) = AF_INET6;
  245. /* note: IPv4-mapped IPv6 addr does not work on Windows */
  246. PR_ConvertIPv4AddrToIPv6(addr.sin_addr.s_addr, &(conn->cin_addr->ipv6.ip));
  247. PRLDAP_SET_PORT(conn->cin_addr, addr.sin_port);
  248. /* copy string equivalent of address into a buffer to use for
  249. * logging since each call to inet_ntoa() returns a pointer to a
  250. * single thread-specific buffer (which prevents us from calling
  251. * inet_ntoa() twice in one call to slapi_log_access()).
  252. */
  253. str_ip = inet_ntoa( addr.sin_addr );
  254. strncpy( buf_ip, str_ip, sizeof( buf_ip ) - 1 );
  255. buf_ip[ sizeof( buf_ip ) - 1 ] = '\0';
  256. str_ip = buf_ip;
  257. } else {
  258. str_ip = str_unknown;
  259. }
  260. }
  261. /*
  262. * get destination address (server IP address this client connected to)
  263. */
  264. slapi_ch_free( (void**)&conn->cin_destaddr ); /* just to be conservative */
  265. if ( conn->c_prfd != NULL ) {
  266. conn->cin_destaddr = (PRNetAddr *) slapi_ch_malloc( sizeof( PRNetAddr ) );
  267. memset( conn->cin_destaddr, 0, sizeof( PRNetAddr ));
  268. if (PR_GetSockName( conn->c_prfd, conn->cin_destaddr ) == 0) {
  269. if ( PR_IsNetAddrType( conn->cin_destaddr, PR_IpAddrV4Mapped ) ) {
  270. PRNetAddr v4destaddr;
  271. memset( &v4destaddr, 0, sizeof( v4destaddr ) );
  272. v4destaddr.inet.family = PR_AF_INET;
  273. v4destaddr.inet.ip = conn->cin_destaddr->ipv6.ip.pr_s6_addr32[3];
  274. PR_NetAddrToString( &v4destaddr, buf_destip, sizeof( buf_destip ) );
  275. } else {
  276. PR_NetAddrToString( conn->cin_destaddr, buf_destip, sizeof( buf_destip ) );
  277. }
  278. buf_destip[ sizeof( buf_destip ) - 1 ] = '\0';
  279. str_destip = buf_destip;
  280. } else {
  281. str_destip = str_unknown;
  282. }
  283. } else {
  284. /* try syscall since c_prfd == NULL */
  285. /* a corner case */
  286. struct sockaddr_in destaddr; /* assuming IPv4 */
  287. #if ( defined( hpux ) )
  288. int destaddrlen;
  289. #else
  290. socklen_t destaddrlen;
  291. #endif
  292. destaddrlen = sizeof( destaddr );
  293. memset( &destaddr, 0, destaddrlen );
  294. if ( (getsockname( conn->c_sd, (struct sockaddr *)&destaddr,
  295. &destaddrlen ) == 0) ) {
  296. conn->cin_destaddr =
  297. (PRNetAddr *)slapi_ch_malloc( sizeof( PRNetAddr ));
  298. memset( conn->cin_destaddr, 0, sizeof( PRNetAddr ));
  299. PR_NetAddrFamily( conn->cin_destaddr ) = AF_INET6;
  300. PRLDAP_SET_PORT( conn->cin_destaddr, destaddr.sin_port );
  301. /* note: IPv4-mapped IPv6 addr does not work on Windows */
  302. PR_ConvertIPv4AddrToIPv6(destaddr.sin_addr.s_addr,
  303. &(conn->cin_destaddr->ipv6.ip));
  304. /* copy string equivalent of address into a buffer to use for
  305. * logging since each call to inet_ntoa() returns a pointer to a
  306. * single thread-specific buffer (which prevents us from calling
  307. * inet_ntoa() twice in one call to slapi_log_access()).
  308. */
  309. str_destip = inet_ntoa( destaddr.sin_addr );
  310. strncpy( buf_destip, str_destip, sizeof( buf_destip ) - 1 );
  311. buf_destip[ sizeof( buf_destip ) - 1 ] = '\0';
  312. str_destip = buf_destip;
  313. } else {
  314. str_destip = str_unknown;
  315. }
  316. }
  317. if ( !in_referral_mode ) {
  318. /* create a sasl connection */
  319. ids_sasl_server_new(conn);
  320. }
  321. /* log useful stuff to our access log */
  322. slapi_log_access( LDAP_DEBUG_STATS,
  323. "conn=%" NSPRIu64 " fd=%d slot=%d %sconnection from %s to %s\n",
  324. conn->c_connid, conn->c_sd, ns, pTmp, str_ip, str_destip );
  325. /* initialize the remaining connection fields */
  326. conn->c_ldapversion = LDAP_VERSION3;
  327. conn->c_starttime = current_time();
  328. conn->c_idlesince = conn->c_starttime;
  329. conn->c_flags = is_SSL ? CONN_FLAG_SSL : 0;
  330. conn->c_authtype = slapi_ch_strdup(SLAPD_AUTH_NONE);
  331. }
  332. /* Create a pool of threads for handling the operations */
  333. void
  334. init_op_threads()
  335. {
  336. int i;
  337. PRErrorCode errorCode;
  338. int max_threads = config_get_threadnumber();
  339. /* Initialize the locks and cv */
  340. if ((pb_q_lock = PR_NewLock()) == NULL ) {
  341. errorCode = PR_GetError();
  342. LDAPDebug( LDAP_DEBUG_ANY,
  343. "init_op_threads: PR_NewLock failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  344. errorCode, slapd_pr_strerror(errorCode), 0 );
  345. exit(-1);
  346. }
  347. if ((op_thread_lock = PR_NewLock()) == NULL ) {
  348. errorCode = PR_GetError();
  349. LDAPDebug( LDAP_DEBUG_ANY,
  350. "init_op_threads: PR_NewLock failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  351. errorCode, slapd_pr_strerror(errorCode), 0 );
  352. exit(-1);
  353. }
  354. if ((op_thread_cv = PR_NewCondVar( op_thread_lock )) == NULL) {
  355. errorCode = PR_GetError();
  356. LDAPDebug( LDAP_DEBUG_ANY, "init_op_threads: PR_NewCondVar failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  357. errorCode, slapd_pr_strerror(errorCode), 0 );
  358. exit(-1);
  359. }
  360. /* start the operation threads */
  361. for (i=0; i < max_threads; i++) {
  362. PR_SetConcurrency(4);
  363. if (PR_CreateThread (PR_USER_THREAD,
  364. (VFP) (void *) connection_threadmain, NULL,
  365. PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
  366. PR_UNJOINABLE_THREAD,
  367. SLAPD_DEFAULT_THREAD_STACKSIZE
  368. ) == NULL ) {
  369. int prerr = PR_GetError();
  370. LDAPDebug( LDAP_DEBUG_ANY, "PR_CreateThread failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
  371. prerr, slapd_pr_strerror( prerr ), 0 );
  372. } else {
  373. g_incr_active_threadcnt();
  374. }
  375. }
  376. }
  377. static void
  378. referral_mode_reply(Slapi_PBlock *pb)
  379. {
  380. struct slapdplugin *plugin;
  381. plugin = (struct slapdplugin *) slapi_ch_calloc(1, sizeof(struct slapdplugin));
  382. if (plugin!=NULL)
  383. {
  384. struct berval *urls[2], url;
  385. char *refer;
  386. refer = config_get_referral_mode();
  387. pb->pb_plugin = plugin;
  388. set_db_default_result_handlers(pb);
  389. urls[0] = &url;
  390. urls[1] = NULL;
  391. url.bv_val = refer;
  392. url.bv_len = refer ? strlen(refer) : 0;
  393. slapi_send_ldap_result(pb, LDAP_REFERRAL, NULL, NULL, 0, urls);
  394. slapi_ch_free((void **)&plugin);
  395. slapi_ch_free((void **)&refer);
  396. }
  397. }
  398. static int
  399. connection_need_new_password(const Connection *conn, const Operation *op, Slapi_PBlock *pb)
  400. {
  401. int r= 0;
  402. /*
  403. * add tag != LDAP_REQ_SEARCH to allow admin server 3.5 to do
  404. * searches when the user needs to reset
  405. * the pw the first time logon.
  406. * LP: 22 Dec 2000: Removing LDAP_REQ_SEARCH. It's very unlikely that AS 3.5 will
  407. * be used to manage DS5.0
  408. */
  409. if ( conn->c_needpw && op->o_tag != LDAP_REQ_MODIFY &&
  410. op->o_tag != LDAP_REQ_BIND && op->o_tag != LDAP_REQ_UNBIND &&
  411. op->o_tag != LDAP_REQ_ABANDON && op->o_tag != LDAP_REQ_EXTENDED)
  412. {
  413. slapi_add_pwd_control ( pb, LDAP_CONTROL_PWEXPIRED, 0);
  414. slapi_log_access( LDAP_DEBUG_STATS, "conn=%" NSPRIu64 " op=%d %s\n",
  415. pb->pb_conn->c_connid, pb->pb_op->o_opid,
  416. "need new password" );
  417. send_ldap_result( pb, LDAP_UNWILLING_TO_PERFORM,
  418. NULL, NULL, 0, NULL );
  419. r= 1;
  420. }
  421. return r;
  422. }
  423. static void
  424. connection_dispatch_operation(Connection *conn, Operation *op, Slapi_PBlock *pb)
  425. {
  426. /* Copy the Connection DN into the operation struct */
  427. op_copy_identity( conn, op );
  428. /* process the operation */
  429. switch ( op->o_tag ) {
  430. case LDAP_REQ_BIND:
  431. operation_set_type(op,SLAPI_OPERATION_BIND);
  432. do_bind( pb );
  433. break;
  434. case LDAP_REQ_UNBIND:
  435. operation_set_type(op,SLAPI_OPERATION_UNBIND);
  436. do_unbind( pb );
  437. break;
  438. case LDAP_REQ_ADD:
  439. operation_set_type(op,SLAPI_OPERATION_ADD);
  440. do_add( pb );
  441. break;
  442. case LDAP_REQ_DELETE:
  443. operation_set_type(op,SLAPI_OPERATION_DELETE);
  444. do_delete( pb );
  445. break;
  446. case LDAP_REQ_MODRDN:
  447. operation_set_type(op,SLAPI_OPERATION_MODRDN);
  448. do_modrdn( pb );
  449. break;
  450. case LDAP_REQ_MODIFY:
  451. operation_set_type(op,SLAPI_OPERATION_MODIFY);
  452. do_modify( pb );
  453. break;
  454. case LDAP_REQ_COMPARE:
  455. operation_set_type(op,SLAPI_OPERATION_COMPARE);
  456. do_compare( pb );
  457. break;
  458. case LDAP_REQ_SEARCH:
  459. operation_set_type(op,SLAPI_OPERATION_SEARCH);
  460. /* On Linux we can use TCP_CORK to get us 5-10% speed benefit when one entry is returned */
  461. /* Nagle needs to be turned _off_, the default is off on linux, in daemon.c */
  462. #if defined(LINUX)
  463. {
  464. int i = 1;
  465. int ret = 0;
  466. /* Set TCP_CORK here but only if this is not LDAPI */
  467. if(!conn->c_unix_local)
  468. {
  469. ret = setsockopt(conn->c_sd,IPPROTO_TCP,TCP_CORK,&i,sizeof(i));
  470. if (ret < 0) {
  471. LDAPDebug(LDAP_DEBUG_ANY, "Failed to set TCP_CORK on connection %" NSPRIu64 "\n",conn->c_connid, 0, 0);
  472. }
  473. }
  474. #endif
  475. do_search( pb );
  476. #if defined(LINUX)
  477. /* Clear TCP_CORK to flush any unsent data but only if not LDAPI*/
  478. i = 0;
  479. if(!conn->c_unix_local)
  480. {
  481. ret = setsockopt(conn->c_sd,IPPROTO_TCP,TCP_CORK,&i,sizeof(i));
  482. if (ret < 0) {
  483. LDAPDebug(LDAP_DEBUG_ANY, "Failed to clear TCP_CORK on connection %" NSPRIu64 "\n",conn->c_connid, 0, 0);
  484. }
  485. }
  486. }
  487. #endif
  488. break;
  489. /* for some strange reason, the console is using this old obsolete
  490. * value for ABANDON so we have to support it until the console
  491. * get fixed
  492. * otherwise the console has VERY BAD performances when a fair amount
  493. * of entries are created in the DIT
  494. */
  495. case LDAP_REQ_ABANDON_30:
  496. case LDAP_REQ_ABANDON:
  497. operation_set_type(op,SLAPI_OPERATION_ABANDON);
  498. do_abandon( pb );
  499. break;
  500. case LDAP_REQ_EXTENDED:
  501. operation_set_type(op,SLAPI_OPERATION_EXTENDED);
  502. do_extended( pb );
  503. break;
  504. default:
  505. LDAPDebug( LDAP_DEBUG_ANY,
  506. "ignoring unknown LDAP request (conn=%" NSPRIu64 ", tag=0x%lx)\n",
  507. conn->c_connid, op->o_tag, 0 );
  508. break;
  509. }
  510. }
  511. /* this function should be called under c_mutex */
  512. int connection_release_nolock (Connection *conn)
  513. {
  514. if (conn->c_refcnt <= 0)
  515. {
  516. slapi_log_error(SLAPI_LOG_FATAL, "connection",
  517. "conn=%" NSPRIu64 " fd=%d Attempt to release connection that is not aquired\n",
  518. conn->c_connid, conn->c_sd);
  519. PR_ASSERT (PR_FALSE);
  520. return -1;
  521. }
  522. else
  523. {
  524. conn->c_refcnt--;
  525. return 0;
  526. }
  527. }
  528. /* this function should be called under c_mutex */
  529. int connection_acquire_nolock (Connection *conn)
  530. {
  531. /* connection in the closing state can't be acquired */
  532. if (conn->c_flags & CONN_FLAG_CLOSING)
  533. {
  534. /* This may happen while other threads are still working on this connection */
  535. slapi_log_error(SLAPI_LOG_FATAL, "connection",
  536. "conn=%" NSPRIu64 " fd=%d Attempt to acquire connection in the closing state\n",
  537. conn->c_connid, conn->c_sd);
  538. return -1;
  539. }
  540. else
  541. {
  542. conn->c_refcnt++;
  543. return 0;
  544. }
  545. }
  546. /* returns non-0 if connection can be reused and 0 otherwise */
  547. int connection_is_free (Connection *conn)
  548. {
  549. int rc;
  550. PR_Lock(conn->c_mutex);
  551. rc = conn->c_sd == SLAPD_INVALID_SOCKET && conn->c_refcnt == 0 &&
  552. !(conn->c_flags & CONN_FLAG_CLOSING);
  553. PR_Unlock(conn->c_mutex);
  554. return rc;
  555. }
  556. int connection_is_active_nolock (Connection *conn)
  557. {
  558. return (conn->c_sd != SLAPD_INVALID_SOCKET) &&
  559. !(conn->c_flags & CONN_FLAG_CLOSING);
  560. }
  561. /* returns non-0 if this is an active connection meaning it is in use
  562. and not in the closing mode */
  563. #if defined LDAP_IOCP
  564. /*
  565. * IO Completion ports are currently only available on NT.
  566. */
  567. typedef enum {read_data, write_data, new_connection} work_type;
  568. static int wait_on_new_work(Connection **ppConn, work_type *type);
  569. static int issue_new_read(Connection *conn);
  570. static int finished_chomping(Connection *conn);
  571. static int read_the_data(Connection *op, int *process_op, int *defer_io, int *defer_pushback);
  572. static int is_new_operation(Connection *conn);
  573. static int process_operation(Connection *conn, Operation *op);
  574. static int connection_operation_new(Connection *conn, Operation **ppOp);
  575. Operation *get_current_op(Connection *conn);
  576. static int handle_read_data(Connection *conn,Operation **op,
  577. int * connection_referenced);
  578. int queue_pushed_back_data(Connection *conn);
  579. static int add_to_select_set(Connection *conn);
  580. static void inc_op_count(Connection* conn)
  581. {
  582. PR_AtomicIncrement(&conn->c_opscompleted);
  583. slapi_counter_increment(ops_completed);
  584. }
  585. static int connection_increment_reference(Connection *conn)
  586. {
  587. int rc = 0;
  588. PR_Lock( conn->c_mutex );
  589. rc = connection_acquire_nolock (conn);
  590. PR_Unlock( conn->c_mutex );
  591. return rc;
  592. }
  593. static void connection_decrement_reference(Connection *conn)
  594. {
  595. PR_Lock( conn->c_mutex );
  596. connection_release_nolock (conn);
  597. PR_Unlock( conn->c_mutex );
  598. }
  599. static void
  600. connection_threadmain()
  601. {
  602. /*
  603. * OK, so this is the thread main routine for the thread pool.
  604. * This is the general idea : wait on the i/o completion port.
  605. * then get some data. There are three cases here:
  606. * 1) This is the first piece of data read for a new LDAP op.
  607. * 2) This is a subsequent, but not final, piece of data read in the current LDAP op on this connection
  608. * 3) This is the last piece of the current LDAP op on the current connection.
  609. * Note that these cases are NOT exclusive ! In particular, all three can occur for the same read.
  610. * based on detecting these cases, we end up doing one or more of the following things:
  611. * a) Create new structures for a new op.
  612. * b) Read data into the BER buffer for the op.
  613. * c) Press on to service the operation request (note that the results are currently written
  614. * synchronously.
  615. * We always queue a new read on the socket too.
  616. * (Note, we need to make sure we don't issue the new read operation until we've copied
  617. * the data from the existing one. Otherwise we'd open ourselves to getting OOO data.)
  618. *
  619. * The intention is that this code will be clean enough to be used for the UNIX build,
  620. * once we fake up I/O completion ports with select and another thread.
  621. */
  622. Connection *conn = NULL;
  623. Operation *op = NULL;
  624. int return_value = -1;
  625. int abandon_connection = 0;
  626. work_type command = 0;
  627. int connection_referenced = 0;
  628. /* Don't ask me, and I will tell you no lies */
  629. #if defined( OSF1 ) || defined( hpux ) || defined( LINUX )
  630. /* Arrange to ignore SIGPIPE signals. */
  631. SIGNAL( SIGPIPE, SIG_IGN );
  632. #endif
  633. while (1) {
  634. abandon_connection = 1; /* we start off assuming that we'll fail somewhere */
  635. conn = NULL; /* just make sure we don't step on an old connection by mistake */
  636. op = NULL; /* Same goes for the operation */
  637. return_value = wait_on_new_work(&conn,&command);
  638. if( op_shutdown )
  639. break;
  640. if (0 == return_value) {
  641. connection_referenced = 0; /* No outstanding ref count on connection if wait for work returned OK */
  642. switch (command) {
  643. case read_data:
  644. return_value = handle_read_data(conn,&op,&connection_referenced);
  645. if (0 == return_value)
  646. {
  647. abandon_connection = 0;
  648. }
  649. break;
  650. case write_data:
  651. /* NYI, but we need to go and find the state for the connection, find the operation
  652. * which queued the write, and then get whatever data we need to write, then write it ! */
  653. break;
  654. case new_connection:
  655. /* NYI, but this would consist of the same stuff which is currently in daemon.c.
  656. * On NT, we'd use AcceptEx() */
  657. break;
  658. default:
  659. break;
  660. }
  661. finished_chomping(conn);
  662. } else {
  663. PR_SetError(PR_IO_ERROR, return_value);
  664. connection_referenced = 1; /* There is an outstanding refcnt on the conn, so we get to close the right one ! */
  665. }
  666. /* If anything went wrong with the connection above, such that we need to
  667. * disconnect it, we'll know here and shoot it in the foot.
  668. */
  669. if ( (NULL != conn) && abandon_connection) {
  670. disconnect_server(conn, conn->c_connid, op ? op->o_opid : -1, SLAPD_DISCONNECT_ABORT, 0 );
  671. if (connection_referenced) {
  672. connection_decrement_reference(conn);
  673. }
  674. }
  675. }
  676. g_decr_active_threadcnt();
  677. }
  678. static int handle_read_data(Connection *conn,Operation **op,
  679. int * connection_referenced)
  680. {
  681. int return_value = 0;
  682. int return_value2 = 0;
  683. int process_op = 0; /* Do we or do we not process a complete operation now ? */
  684. int defer_io = 0;
  685. int defer_pushback = 0;
  686. if (is_new_operation(conn)) {
  687. return_value = connection_operation_new(conn,op);
  688. } else {
  689. *op = get_current_op(conn);
  690. }
  691. /* if connection is closing */
  692. if (return_value != 0) {
  693. LDAPDebug(LDAP_DEBUG_CONNS,
  694. "handle_read_data returns as conn %" NSPRIu64 " closing, fd=%d\n",
  695. conn->c_connid,conn->c_sd,0);
  696. return return_value;
  697. }
  698. return_value = read_the_data(conn,&process_op, &defer_io, &defer_pushback);
  699. if (0 == return_value) {
  700. int replication_session = conn->c_isreplication_session;
  701. if (0 != process_op)
  702. return_value = process_operation(conn,*op);
  703. /* Post any pending I/O operation _after_ processing any operation */
  704. if (replication_session) {
  705. /* Initiate any deferred I/O here */
  706. if (defer_io) {
  707. if (conn->c_flags & CONN_FLAG_SSL) {
  708. add_to_select_set(conn);
  709. return_value2 = 0;
  710. } else {
  711. return_value2 = issue_new_read(conn);
  712. }
  713. }
  714. if (defer_pushback) {
  715. return_value2 = queue_pushed_back_data(conn);
  716. }
  717. }
  718. }
  719. else
  720. *connection_referenced = 1;
  721. if (return_value) {
  722. return return_value;
  723. } else {
  724. return return_value2;
  725. }
  726. }
  727. /* Function which does the work involved in servicing an LDAP operation. */
  728. static int process_operation(Connection *conn, Operation *op)
  729. {
  730. Slapi_PBlock *pb = NULL;
  731. ber_len_t len;
  732. ber_tag_t tag;
  733. ber_int_t msgid;
  734. int return_value = 0;
  735. int destroy_content = 1;
  736. pb = (Slapi_PBlock *) slapi_ch_calloc( 1, sizeof(Slapi_PBlock) );
  737. pb->pb_conn = conn;
  738. pb->pb_op = op;
  739. /* destroy operation content when done */
  740. slapi_pblock_set (pb, SLAPI_DESTROY_CONTENT, &destroy_content);
  741. if (! config_check_referral_mode()) {
  742. slapi_counter_increment(ops_initiated);
  743. slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps);
  744. }
  745. if ( (tag = ber_get_int( op->o_ber, &msgid ))
  746. != LDAP_TAG_MSGID ) {
  747. /* log, close and send error */
  748. LDAPDebug( LDAP_DEBUG_ANY,
  749. "conn=%" NSPRIu64 " unable to read tag for incoming request\n", conn->c_connid, 0, 0 );
  750. return_value = -1;
  751. goto done;
  752. }
  753. op->o_msgid = msgid;
  754. tag = ber_peek_tag( op->o_ber, &len );
  755. switch ( tag ) {
  756. case LBER_ERROR:
  757. case LDAP_TAG_LDAPDN: /* optional username, for CLDAP */
  758. /* log, close and send error */
  759. LDAPDebug( LDAP_DEBUG_ANY,
  760. "conn=%" NSPRIu64 " ber_peek_tag returns 0x%lx\n", conn->c_connid, tag, 0 );
  761. return_value = -1;
  762. goto done;
  763. default:
  764. break;
  765. }
  766. op->o_tag = tag;
  767. /* are we in referral-only mode? */
  768. if (config_check_referral_mode() && tag != LDAP_REQ_UNBIND)
  769. {
  770. referral_mode_reply(pb);
  771. goto done;
  772. }
  773. /* check if new password is required */
  774. if(connection_need_new_password(conn, op, pb))
  775. {
  776. goto done;
  777. }
  778. /* if this is a bulk import, only "add" and "import done (extop)" are
  779. * allowed */
  780. if (conn->c_flags & CONN_FLAG_IMPORT) {
  781. if ((tag != LDAP_REQ_ADD) && (tag != LDAP_REQ_EXTENDED)) {
  782. /* no cookie for you. */
  783. LDAPDebug(LDAP_DEBUG_ANY, "Attempted operation %d from "
  784. "within bulk import\n", tag, 0, 0);
  785. slapi_send_ldap_result(pb, LDAP_PROTOCOL_ERROR, NULL, NULL,
  786. 0, NULL);
  787. return_value = -1;
  788. goto done;
  789. }
  790. }
  791. /*
  792. * Call the do_<operation> function to process this request.
  793. */
  794. connection_dispatch_operation(conn, op, pb);
  795. done:
  796. /* If we're here, it means that we successfully completed an operation , so bump the counts */
  797. inc_op_count(conn);
  798. if ( !( pb->pb_op->o_flags & OP_FLAG_PS )) {
  799. /*
  800. * If not a persistent search, remove the operation
  801. * from this connection's list.
  802. */
  803. PR_Lock( conn->c_mutex );
  804. connection_remove_operation( conn, op );
  805. PR_Unlock( conn->c_mutex );
  806. /* destroying the pblock will cause destruction of the operation
  807. * so this must happen before releasing the connection
  808. */
  809. slapi_pblock_destroy( pb );
  810. PR_Lock( conn->c_mutex );
  811. if (connection_release_nolock (conn) != 0)
  812. {
  813. return_value = -1;
  814. }
  815. PR_Unlock( conn->c_mutex );
  816. }
  817. return return_value;
  818. }
  819. /* Helper functions for the code above: */
  820. struct Conn_private {
  821. /* First the platform-dependent part */
  822. #ifdef _WIN32
  823. OVERLAPPED c_overlapped;
  824. DWORD c_buffer_size;
  825. char *c_buffer;
  826. DWORD c_number_of_async_bytes_read;
  827. DWORD c_buffer_offset;
  828. DWORD c_deferred_length;
  829. #else
  830. #endif
  831. /* Now the platform independent part */
  832. Operation *c_current_op;
  833. int c_flags;
  834. };
  835. static void connection_free_private_buffer(Connection *conn)
  836. {
  837. #ifdef _WIN32
  838. if (NULL != conn->c_private) {
  839. slapi_ch_free( (void**)&conn->c_private->c_buffer);
  840. }
  841. #else
  842. #endif
  843. }
  844. #define FLAG_CONN_HAD_SOME 1 /* Set when we've read the first piece of data already, means we don't need to allocate a new op */
  845. #define FLAG_CONN_COMPLETE 2 /* Set when we've read all of an LDAP operation request, means we can proceed to process it */
  846. /* Little helper functions */
  847. Operation *get_current_op(Connection *conn)
  848. {
  849. Operation *return_op = conn->c_private->c_current_op;
  850. PR_ASSERT(NULL != return_op);
  851. return return_op;
  852. }
  853. static int is_new_operation(Connection *conn)
  854. {
  855. if (0 == conn->c_private->c_flags) {
  856. return 1;
  857. } else {
  858. return 0;
  859. }
  860. }
  861. /* Called when a new operation comes in on a connection */
  862. static int connection_operation_new(Connection *conn, Operation **ppOp)
  863. {
  864. /* we need to make a new operation structure and chain it onto the connection */
  865. Operation *temp_op = NULL;
  866. int rc;
  867. PR_Lock( conn->c_mutex );
  868. if (connection_is_active_nolock(conn) == 0) {
  869. LDAPDebug(LDAP_DEBUG_CONNS,
  870. "not creating a new operation when conn %" NSPRIu64 " closing\n",
  871. conn->c_connid,0,0);
  872. PR_Unlock( conn->c_mutex );
  873. return -1;
  874. }
  875. temp_op = operation_new( plugin_build_operation_action_bitmap( 0,
  876. plugin_get_server_plg() ));
  877. connection_add_operation( conn, temp_op);
  878. rc = connection_acquire_nolock (conn);
  879. PR_Unlock( conn->c_mutex );
  880. /* Stash the op pointer in the connection structure for later use */
  881. PR_ASSERT(NULL == conn->c_private->c_current_op);
  882. conn->c_private->c_current_op = temp_op;
  883. *ppOp = temp_op;
  884. return rc;
  885. }
  886. /* Call this to tell the select thread to put us back into the read-ready signal set */
  887. static int add_to_select_set(Connection *conn)
  888. {
  889. conn->c_gettingber = 0;
  890. signal_listner();
  891. return 0;
  892. }
  893. static int remove_from_select_set(Connection *conn)
  894. {
  895. conn->c_gettingber = 1;
  896. return 0;
  897. }
  898. /* Helper functions from here on are platform-dependent */
  899. /* First the NT ones */
  900. #ifdef _WIN32
  901. static HANDLE completion_port = INVALID_HANDLE_VALUE;
  902. #define COMPKEY_DIE ((DWORD) -1L) /* used to kill off workers */
  903. static void push_back_data(Connection *conn, size_t offset, size_t length);
  904. static int queue_pushed_back_data(Connection *conn);
  905. /* Called when we've read from the completion queue, so there's data
  906. * waiting for us to pickup. We're told: the number of bytes read, the
  907. * address of the buffer, the state of this connection (new op, middle of op).
  908. */
  909. static int read_the_data(Connection *conn, int *process_op, int *defer_io, int *defer_pushback)
  910. {
  911. Conn_private *priv = conn->c_private;
  912. Operation *op = NULL;
  913. DWORD Bytes_Read = 0;
  914. char *Buffer = NULL;
  915. ber_tag_t tag = 0;
  916. int return_value = -1;
  917. ber_len_t ber_len = 0;
  918. ber_len_t Bytes_Scanned = 0;
  919. *defer_io = 0;
  920. *defer_pushback = 0;
  921. op = priv->c_current_op;
  922. Bytes_Read = priv->c_number_of_async_bytes_read;
  923. Buffer = priv->c_buffer + priv->c_buffer_offset;
  924. PR_ASSERT(NULL != op->o_ber);
  925. /* Is this an SSL connection ? */
  926. if (0 == (conn->c_flags & CONN_FLAG_SSL)) {
  927. /* Not SSL */
  928. if (! config_check_referral_mode()) {
  929. /* Update stats */
  930. PR_Lock( op_thread_lock );
  931. (*(g_get_global_snmp_vars()->ops_tbl.dsBytesRecv)) += Bytes_Read;
  932. PR_Unlock( op_thread_lock );
  933. }
  934. /* We need to read the data into the BER buffer */
  935. /* This can return a tag pr LBER_DEFAULT, indicating some error condition */
  936. tag = ber_get_next_buffer_ext( Buffer, Bytes_Read, &ber_len, op->o_ber, &Bytes_Scanned, conn->c_sb );
  937. if (LBER_DEFAULT == tag || LBER_OVERFLOW == tag)
  938. {
  939. if (0 == Bytes_Scanned)
  940. {
  941. /* Means we encountered an error---eg the client sent us pure crap---
  942. a bunch of bytes which we took to be a tag, length, then we ran off the
  943. end of the buffer. The next time we get here, we'll be returned LBER_DEFAULT
  944. This means that everything we've seen up till now is useless because it wasn't
  945. an LDAP message.
  946. So, we toss it away ! */
  947. if (LBER_OVERFLOW == tag) {
  948. slapi_log_error( SLAPI_LOG_FATAL, "connection",
  949. "conn=%" NSPRIu64 " fd=%d The length of BER Element was too long.\n",
  950. conn->c_connid, conn->c_sd );
  951. }
  952. PR_Lock( conn->c_mutex );
  953. connection_remove_operation( conn, op );
  954. operation_free(&op, conn);
  955. priv->c_current_op = NULL;
  956. PR_Unlock( conn->c_mutex );
  957. return -1; /* Abandon Connection */
  958. }
  959. }
  960. if (is_ber_too_big(conn,ber_len))
  961. {
  962. PR_Lock( conn->c_mutex );
  963. connection_remove_operation( conn, op );
  964. operation_free(&op, conn);
  965. priv->c_current_op = NULL;
  966. PR_Unlock( conn->c_mutex );
  967. return -1; /* Abandon Connection */
  968. }
  969. /* We set the flag to indicate that we'er in the middle of an op */
  970. priv->c_flags |= FLAG_CONN_HAD_SOME;
  971. /* Then we decide whether this is the last read for the current op */
  972. /* and set the flag accordingly */
  973. if (LBER_DEFAULT != tag) { /* we received a complete message */
  974. if (LDAP_TAG_MESSAGE == tag) { /* looks like an LDAP message */
  975. /* It's time to process this operation */
  976. *process_op = 1;
  977. priv->c_current_op = NULL;
  978. priv->c_flags = 0;
  979. } else {
  980. /*
  981. * We received a non-LDAP message. Log and close connection.
  982. */
  983. LDAPDebug( LDAP_DEBUG_ANY,
  984. "conn=%" NSPRIu64 " received a non-LDAP message"
  985. " (tag 0x%lx, expected 0x%lx)\n",
  986. conn->c_connid, tag, LDAP_TAG_MESSAGE );
  987. PR_Lock( conn->c_mutex );
  988. connection_remove_operation( conn, op );
  989. operation_free(&op, conn);
  990. priv->c_current_op = NULL;
  991. PR_Unlock( conn->c_mutex );
  992. return -1; /* Abandon Connection */
  993. }
  994. }
  995. /* Finally, mark whether there's the beginning of another operation remaining in the buffer */
  996. /* If there is, queue up another I/O completion request on the port to get it handled OK */
  997. /* If not, issue a new read on the socket. */
  998. if (Bytes_Scanned != Bytes_Read) {
  999. if (connection_increment_reference(conn) == -1) {
  1000. LDAPDebug(LDAP_DEBUG_CONNS,
  1001. "could not acquire lock in issue_new_read as conn %" NSPRIu64 " closing fd=%d\n",
  1002. conn->c_connid,conn->c_sd,0);
  1003. /* XXX how to handle this error? */
  1004. /* MAB: 25 Jan 01: let's try like this and pray this won't leak... */
  1005. /* GB : this should be OK because an error here
  1006. * means some other thread decided to close the
  1007. * connection, which mean a fatal error happened
  1008. * in that case just forget about the remaining
  1009. * data and return
  1010. */
  1011. return (0);
  1012. }
  1013. push_back_data(conn,priv->c_overlapped.Offset + Bytes_Scanned,Bytes_Read-Bytes_Scanned);
  1014. if (!conn->c_isreplication_session) {
  1015. if ((return_value = queue_pushed_back_data(conn)) == -1) {
  1016. /* MAB: 25 jan 01 we need to decrement the conn refcnt before leaving... Otherwise,
  1017. * this thread will unbalance the ref_cnt inc and dec for this connection
  1018. * and the result is that the connection is never closed and instead is kept
  1019. * forever an never released -> this was causing a fd starvation on NT
  1020. */
  1021. connection_decrement_reference(conn);
  1022. LDAPDebug(LDAP_DEBUG_CONNS,
  1023. "push_back_data failed: closing conn %" NSPRIu64 " fd=%d\n",
  1024. conn->c_connid,conn->c_sd,0);
  1025. }
  1026. } else {
  1027. /* Queue the I/O later to serialize */
  1028. *defer_pushback = 1;
  1029. return_value = 0;
  1030. }
  1031. } else {
  1032. priv->c_overlapped.Offset = 0;
  1033. if (!conn->c_isreplication_session) {
  1034. return_value = issue_new_read(conn);
  1035. } else {
  1036. /* Queue the I/O later to serialize */
  1037. *defer_io = 1;
  1038. return_value = 0;
  1039. }
  1040. }
  1041. } else {
  1042. /* SSL */
  1043. if ( (tag = ber_get_next( conn->c_sb, &ber_len, op->o_ber ))
  1044. != LDAP_TAG_MESSAGE ) {
  1045. return( -1 );
  1046. }
  1047. if(is_ber_too_big(conn,ber_len))
  1048. {
  1049. return( -1 );
  1050. }
  1051. /* Put this connection back into the read-ready signal state */
  1052. /* priv->c_flags |= FLAG_CONN_COMPLETE; Redundant now */
  1053. /* It's time to process this operation */
  1054. *process_op = 1;
  1055. priv->c_current_op = NULL;
  1056. priv->c_flags = 0;
  1057. return_value = 0;
  1058. if (!conn->c_isreplication_session) {
  1059. add_to_select_set(conn);
  1060. } else {
  1061. *defer_io = 1;
  1062. }
  1063. }
  1064. return return_value;
  1065. }
  1066. void push_back_data(Connection *conn, size_t offset, size_t length)
  1067. {
  1068. conn->c_private->c_overlapped.Offset = offset;
  1069. conn->c_private->c_deferred_length = length;
  1070. }
  1071. int queue_pushed_back_data(Connection *conn)
  1072. {
  1073. /* Use PostQueuedCompletionStatus() to push the data back up the pipe */
  1074. BOOL return_bool = FALSE;
  1075. return_bool = PostQueuedCompletionStatus(completion_port,conn->c_private->c_deferred_length,(DWORD)conn,&conn->c_private->c_overlapped);
  1076. if (return_bool) {
  1077. return 0;
  1078. } else {
  1079. return -1;
  1080. }
  1081. }
  1082. /* This function issues a new read operation on the connection.
  1083. * Called once we've finished reading everything from the buffer.
  1084. * VMS crusties will notice the similarity to $QIO.
  1085. */
  1086. int issue_new_read(Connection *conn)
  1087. {
  1088. BOOL return_bool = FALSE;
  1089. HANDLE socket = INVALID_HANDLE_VALUE;
  1090. void **buffer = NULL;
  1091. DWORD bytes_read = 0;
  1092. DWORD buffer_size = 0;
  1093. OVERLAPPED *overlapped = NULL;
  1094. PR_ASSERT(NULL != conn);
  1095. socket = (HANDLE)conn->c_sd;
  1096. PR_ASSERT(NULL != socket);
  1097. /* here we make sure that we have a buffer allocated */
  1098. buffer = &conn->c_private->c_buffer;
  1099. if (NULL == *buffer) {
  1100. *buffer = (void*)slapi_ch_malloc(LDAP_SOCKET_IO_BUFFER_SIZE);
  1101. if (NULL == *buffer) {
  1102. /* memory allocation failure */
  1103. return -1;
  1104. }
  1105. conn->c_private->c_buffer_size = LDAP_SOCKET_IO_BUFFER_SIZE;
  1106. }
  1107. buffer_size = conn->c_private->c_buffer_size;
  1108. overlapped = &conn->c_private->c_overlapped;
  1109. if (connection_increment_reference(conn) == -1) {
  1110. LDAPDebug(LDAP_DEBUG_CONNS,
  1111. "could not acquire lock in issue_new_read as conn %" NSPRIu64 " closing fd=%d\n",
  1112. conn->c_connid,conn->c_sd,0);
  1113. /* This means that the connection is closing */
  1114. return -1;
  1115. }
  1116. return_bool = ReadFile(socket,*buffer,buffer_size,&bytes_read,overlapped);
  1117. if ( !return_bool && ERROR_IO_PENDING != GetLastError( ) ) {
  1118. /* This means that the connection is shot for some reason */
  1119. connection_decrement_reference(conn);
  1120. return -1;
  1121. } else {
  1122. /* Our work is done, i/o read now queued */
  1123. return 0;
  1124. }
  1125. }
  1126. static int wait_on_new_work(Connection **ppConn, work_type *type)
  1127. {
  1128. /* Here, we wait on the I/O completion port for new data */
  1129. /* because we're not sure whether the completion port has been created yet,
  1130. * we wait 'till it has been.
  1131. */
  1132. Connection *temp_conn = NULL;
  1133. DWORD Bytes_Received = 0;
  1134. OVERLAPPED *pOverlapped = NULL;
  1135. BOOL return_bool = FALSE;
  1136. *type = read_data;
  1137. while ( (INVALID_HANDLE_VALUE == completion_port) && (!op_shutdown) ) {
  1138. Sleep(100);
  1139. }
  1140. while (1) {
  1141. if (op_shutdown) {
  1142. return EINTR;
  1143. }
  1144. return_bool = GetQueuedCompletionStatus(completion_port,&Bytes_Received,(DWORD*)&temp_conn,&pOverlapped,INFINITE);
  1145. if ((unsigned long)temp_conn == COMPKEY_DIE ) {
  1146. continue; /* kill this worker */
  1147. }
  1148. if (TRUE == return_bool) {
  1149. /* we successfully completed the I/O operation */
  1150. /* set the connection pointer the caller gave us to the one from the port */
  1151. PR_ASSERT(NULL != pOverlapped);
  1152. PR_ASSERT(NULL != temp_conn);
  1153. *ppConn = temp_conn;
  1154. /* store the # bytes read in the connection structure */
  1155. (*ppConn)->c_private->c_number_of_async_bytes_read = Bytes_Received;
  1156. (*ppConn)->c_private->c_buffer_offset = (*ppConn)->c_private->c_overlapped.Offset;
  1157. if( Bytes_Received == 0 )
  1158. {
  1159. /* 0 bytes received from a completed overlapped I/O
  1160. operation means the socket's been closed. */
  1161. break;
  1162. }
  1163. (*ppConn)->c_idlesince = current_time();
  1164. /* If we exit here, everything is OK */
  1165. connection_decrement_reference(temp_conn);
  1166. return 0;
  1167. }
  1168. if ( (FALSE == return_bool) && (NULL == pOverlapped) ) {
  1169. /* we timed out */
  1170. /* slapi_log_error( SLAPI_LOG_FATAL, "connection",
  1171. "GetQueuedCompletionStatus call timed out\n");*/
  1172. continue;
  1173. }
  1174. if ( (FALSE == return_bool) && (NULL != pOverlapped)) {
  1175. /* signifies some sort of i/o error, most likely an abortive close */
  1176. /* slapi_log_error( SLAPI_LOG_FATAL, "connection",
  1177. "GetQueuedCompletionStatus call failed; error - %ld\n", GetLastError());*/
  1178. if (NULL != temp_conn) {
  1179. /* If we were told the connection, return it--otherwise we can't tell which connection to close */
  1180. *ppConn = temp_conn;
  1181. }
  1182. break;
  1183. }
  1184. }
  1185. return EPIPE; /* we failed to read for some reason */
  1186. }
  1187. int connection_new_private(Connection *conn)
  1188. {
  1189. /* first add to the completion port */
  1190. DWORD threads = 10; /* DBDB hackhack */
  1191. HANDLE socket = INVALID_HANDLE_VALUE;
  1192. HANDLE return_port = NULL;
  1193. Conn_private *priv = NULL;
  1194. int return_value = -1;
  1195. PR_ASSERT(NULL != conn);
  1196. socket = (HANDLE) conn->c_sd;
  1197. /* make the private data if it isn't already there */
  1198. if (NULL == conn->c_private) {
  1199. Conn_private *new_private = (Conn_private *)slapi_ch_malloc(sizeof(Conn_private));
  1200. if (NULL == new_private) {
  1201. /* memory allocation failed */
  1202. return -1;
  1203. }
  1204. conn->c_private = new_private;
  1205. ZeroMemory(conn->c_private,sizeof(Conn_private));
  1206. }
  1207. priv = conn->c_private;
  1208. /* Make sure the private structure is cleared */
  1209. /* Note: you must modify this code if the contents
  1210. * of the structure are changed---we can't simply
  1211. * zero the structure because we want to preserve the
  1212. * buffer. IMPORTANT---here we reuse the I/O buffer
  1213. * from before. This is deliberate, to avoid mallocing again */
  1214. ZeroMemory(&(priv->c_overlapped),sizeof(OVERLAPPED));
  1215. priv->c_number_of_async_bytes_read = 0;
  1216. priv->c_buffer_offset = 0;
  1217. priv->c_flags = 0;
  1218. priv->c_current_op = NULL;
  1219. if (INVALID_HANDLE_VALUE == completion_port) {
  1220. /* completion port not yet setup, we need to make it */
  1221. completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
  1222. if (NULL == completion_port) {
  1223. LDAPDebug(LDAP_DEBUG_ANY,"Failed to create master I/O completion port\n",0,0,0);
  1224. return -1;
  1225. }
  1226. }
  1227. /* If the connection is SSL, don't do the right thing */
  1228. if (0 == (conn->c_flags & CONN_FLAG_SSL)) {
  1229. return_port = CreateIoCompletionPort(socket,completion_port,(DWORD)conn,0);
  1230. if (NULL == return_port) {
  1231. LDAPDebug(LDAP_DEBUG_ANY,"Failed to associate socket with I/O completion port, fd=%d,GetLastError = %d\n",socket,GetLastError(),0);
  1232. return -1;
  1233. }
  1234. /* Now queue the initial read on this connection */
  1235. return_value = issue_new_read(conn);
  1236. } else {
  1237. return_value = 0;
  1238. }
  1239. return return_value;
  1240. }
  1241. /* If all is well, this only gets called for SSL connections */
  1242. int connection_activity(Connection *conn)
  1243. {
  1244. /* First check that this really is an SSL connection */
  1245. if (0 == (conn->c_flags & CONN_FLAG_SSL)) {
  1246. return -1;
  1247. }
  1248. /* Now, the plan here is to push something up the IOCP pipe */
  1249. /* We need to fake something up so that the code which pulls
  1250. * it off the queue does the right thing. Here's what we do:
  1251. * We just call PostQueuedCompletionStatus like normal.
  1252. * The connection is marked as SSL, and it is this that the
  1253. * reading code notices. Simple !
  1254. */
  1255. /* Also, we need to participate in the signaling protocol to the select thread */
  1256. remove_from_select_set(conn);
  1257. /* We hold the lock already, increment the reference count, which will
  1258. be decremented in wait_for_new_work(). */
  1259. if (connection_acquire_nolock (conn) == -1) {
  1260. LDAPDebug(LDAP_DEBUG_CONNS,
  1261. "could not acquire lock in connection_activity as conn %" NSPRIu64 " closing fd=%d\n",
  1262. conn->c_connid,conn->c_sd,0);
  1263. /* XXX how to handle this error? */
  1264. /* MAB: 25 Jan 01: let's return on error and pray this won't leak */
  1265. return (-1);
  1266. }
  1267. push_back_data(conn, 0, 1);
  1268. return queue_pushed_back_data(conn);
  1269. }
  1270. static int finished_chomping(Connection *conn)
  1271. {
  1272. /* On NT we don't need to do anything here */
  1273. return 0;
  1274. }
  1275. #else /* WIN32/UNIX */
  1276. /*
  1277. * This is where the UNIX Helper functions would be if IO
  1278. * Completion Ports were supported on UNIX.
  1279. */
  1280. #endif /* WIN32/UNIX */
  1281. #else /* LDAP_IOCP */
  1282. /*
  1283. * IO Completion Ports are not available on this platform.
  1284. */
  1285. static int counter= 0; /* JCM Dumb Name */
  1286. /* The connection private structure for UNIX turbo mode */
  1287. struct Conn_private
  1288. {
  1289. int turbo_flag; /* set if we are currently in turbo mode */
  1290. int previous_op_count; /* the operation counter value last time we sampled it, used to compute operation rate */
  1291. int operation_rate; /* rate (ops/sample period) at which this connection has been processing operations */
  1292. time_t previous_count_check_time; /* The wall clock time we last sampled the operation count */
  1293. size_t c_buffer_size; /* size of the socket read buffer */
  1294. char *c_buffer; /* pointer to the socket read buffer */
  1295. size_t c_buffer_bytes; /* number of bytes currently stored in the buffer */
  1296. size_t c_buffer_offset; /* offset to the location of new data in the buffer */
  1297. };
  1298. int
  1299. connection_new_private(Connection *conn)
  1300. {
  1301. if (NULL == conn->c_private) {
  1302. Conn_private *new_private = (Conn_private *)slapi_ch_calloc(1,sizeof(Conn_private));
  1303. if (NULL == new_private) {
  1304. /* memory allocation failed */
  1305. return -1;
  1306. }
  1307. conn->c_private = new_private;
  1308. }
  1309. /* The c_buffer is supposed to be NULL here, cleaned by connection_cleanup,
  1310. double check to avoid memory leak */
  1311. if (NULL == conn->c_private->c_buffer) {
  1312. conn->c_private->c_buffer = (char*)slapi_ch_malloc(LDAP_SOCKET_IO_BUFFER_SIZE);
  1313. if (NULL == conn->c_private->c_buffer) {
  1314. /* memory allocation failure */
  1315. return -1;
  1316. }
  1317. conn->c_private->c_buffer_size = LDAP_SOCKET_IO_BUFFER_SIZE;
  1318. }
  1319. /*
  1320. * Clear the private structure, preserving the buffer and length in
  1321. * case we are reusing the buffer.
  1322. */
  1323. {
  1324. char *c_buffer = conn->c_private->c_buffer;
  1325. size_t c_buffer_size = conn->c_private->c_buffer_size;;
  1326. memset( conn->c_private, 0, sizeof(Conn_private));
  1327. conn->c_private->c_buffer = c_buffer;
  1328. conn->c_private->c_buffer_size = c_buffer_size;
  1329. }
  1330. return 0;
  1331. }
  1332. static void
  1333. connection_free_private_buffer(Connection *conn)
  1334. {
  1335. if (NULL != conn->c_private) {
  1336. slapi_ch_free((void*)&(conn->c_private->c_buffer));
  1337. }
  1338. }
  1339. /*
  1340. * Turbo Mode:
  1341. * Turbo Connection Mode is designed to more efficiently
  1342. * serve a small number of highly active connections performing
  1343. * mainly search operations. It is only used on UNIX---completion
  1344. * ports on NT make it unnecessary.
  1345. * A connection can be in turbo mode, or not in turbo mode.
  1346. * For non-turbo mode, the code path is the same as was before:
  1347. * worker threads wait on a condition variable for work.
  1348. * When they awake they consult the operation queue for
  1349. * something to do, read the operation from the connection's socket,
  1350. * perform the operation and go back to waiting on the condition variable.
  1351. * In Turbo Mode, a worker thread becomes associated with a connection.
  1352. * It then waits not on the condition variable, but directly on read ready
  1353. * state on the connection's socket. When new data arrives, it decodes
  1354. * the operation and executes it, and then goes back to read another
  1355. * operation from the same socket, or block waiting on new data.
  1356. * The read is done non-blocking, wait in poll with a timeout.
  1357. *
  1358. * There is a mechanism to ensure that only the most active
  1359. * connections are in turbo mode at any time. If this were not
  1360. * the case we could starve out some client operation requests
  1361. * due to waiting on I/O in many turbo threads at the same time.
  1362. *
  1363. * Each worker thread periodically (every 10 seconds) examines
  1364. * the activity level for the connection it is processing.
  1365. * This applies regardless of whether the connection is
  1366. * currently in turbo mode or not. Activity is measured as
  1367. * the number of operations initiated since the last check was done.
  1368. * The N connections with the highest activity level are allowed
  1369. * to enter turbo mode. If the current connection is in the top N,
  1370. * then we decide to enter turbo mode. If the current connection
  1371. * is no longer in the top N, then we leave turbo mode.
  1372. * The decision to enter or leave turbo mode is taken under
  1373. * the connection mutex, preventing race conditions where
  1374. * more than one thread can change the turbo state of a connection
  1375. * concurrently.
  1376. */
  1377. /* Connection status values returned by
  1378. connection_wait_for_new_pb(), connection_read_operation(), etc. */
  1379. #define CONN_FOUND_WORK_TO_DO 0
  1380. #define CONN_SHUTDOWN 1
  1381. #define CONN_NOWORK 2
  1382. #define CONN_DONE 3
  1383. #define CONN_TIMEDOUT 4
  1384. #define CONN_TURBO_TIMEOUT_INTERVAL 1000 /* milliseconds */
  1385. #define CONN_TURBO_CHECK_INTERVAL 5 /* seconds */
  1386. #define CONN_TURBO_PERCENTILE 50 /* proportion of threads allowed to be in turbo mode */
  1387. #define CONN_TURBO_HYSTERESIS 0 /* avoid flip flopping in and out of turbo mode */
  1388. int connection_wait_for_new_pb(Slapi_PBlock **ppb, PRIntervalTime interval)
  1389. {
  1390. int ret = CONN_FOUND_WORK_TO_DO;
  1391. PR_Lock( op_thread_lock );
  1392. /* While there is no operation to do... */
  1393. while( counter < 1) {
  1394. /* Check if we should shutdown. */
  1395. if (op_shutdown) {
  1396. PR_Unlock( op_thread_lock );
  1397. return CONN_SHUTDOWN;
  1398. }
  1399. PR_WaitCondVar( op_thread_cv, interval);
  1400. }
  1401. /* There is some work to do. */
  1402. counter--;
  1403. PR_Unlock( op_thread_lock );
  1404. /* Get the next operation from the work queue. */
  1405. *ppb = get_pb();
  1406. if (*ppb == NULL) {
  1407. LDAPDebug( LDAP_DEBUG_ANY, "pb is null \n", 0, 0, 0 );
  1408. PR_Lock( op_thread_lock );
  1409. counter++;
  1410. PR_Unlock( op_thread_lock );
  1411. ret = CONN_NOWORK;
  1412. }
  1413. return ret;
  1414. }
  1415. void connection_make_new_pb(Slapi_PBlock **ppb, Connection *conn)
  1416. {
  1417. /* In the classic case, the pb is made in connection_activity() and then
  1418. queued. get_pb() dequeues it. So we can just make it ourselves here */
  1419. /* *ppb = (Slapi_PBlock *) slapi_ch_calloc( 1, sizeof(Slapi_PBlock) ); */
  1420. *ppb = slapi_pblock_new();
  1421. (*ppb)->pb_conn = conn;
  1422. (*ppb)->pb_op = operation_new( plugin_build_operation_action_bitmap( 0,
  1423. plugin_get_server_plg() ));
  1424. connection_add_operation( conn, (*ppb)->pb_op );
  1425. }
  1426. /*
  1427. * Utility function called by connection_read_operation(). This is a
  1428. * small wrapper on top of libldap's ber_get_next_buffer_ext().
  1429. *
  1430. * Return value:
  1431. * 0: Success
  1432. * case 1) If there was not enough data in the buffer to complete the
  1433. * message, go to the next cycle. In this case, bytes_scanned is set
  1434. * to a positive number and *tagp is set to LBER_DEFAULT.
  1435. * case 2) Complete. *tagp == (tag of the message) and bytes_scanned is
  1436. * set to a positive number.
  1437. * -1: Failure
  1438. * case 1) *tagp == LBER_OVERFLOW: the length is either bigger than
  1439. * ber_uint_t type or the value preset via
  1440. * LBER_SOCKBUF_OPT_MAX_INCOMING_SIZE option
  1441. * case 2) *tagp == LBER_DEFAULT: memory error or tag mismatch
  1442. */
  1443. static int
  1444. get_next_from_buffer( void *buffer, size_t buffer_size, ber_len_t *lenp,
  1445. ber_tag_t *tagp, BerElement *ber, Connection *conn )
  1446. {
  1447. PRErrorCode err = 0;
  1448. PRInt32 syserr = 0;
  1449. ber_len_t bytes_scanned = 0;
  1450. *lenp = 0;
  1451. *tagp = ber_get_next_buffer_ext( buffer, buffer_size, lenp, ber,
  1452. &bytes_scanned, conn->c_sb );
  1453. if ((LBER_OVERFLOW == *tagp || LBER_DEFAULT == *tagp) && 0 == bytes_scanned)
  1454. {
  1455. if (LBER_OVERFLOW == *tagp)
  1456. {
  1457. err = SLAPD_DISCONNECT_BER_TOO_BIG;
  1458. }
  1459. else
  1460. {
  1461. err = SLAPD_DISCONNECT_BAD_BER_TAG;
  1462. }
  1463. syserr = errno;
  1464. /* Bad stuff happened, like the client sent us some junk */
  1465. LDAPDebug( LDAP_DEBUG_CONNS,
  1466. "ber_get_next failed for connection %" NSPRIu64 "\n", conn->c_connid, 0, 0 );
  1467. /* reset private buffer */
  1468. conn->c_private->c_buffer_bytes = conn->c_private->c_buffer_offset = 0;
  1469. /* drop connection */
  1470. disconnect_server( conn, conn->c_connid, -1, err, syserr );
  1471. return -1;
  1472. }
  1473. /* success, or need to wait for more data */
  1474. conn->c_private->c_buffer_offset += bytes_scanned;
  1475. return 0;
  1476. }
  1477. /* Either read read data into the connection buffer, or fail with err set */
  1478. static int
  1479. connection_read_ldap_data(Connection *conn, PRInt32 *err)
  1480. {
  1481. int ret = 0;
  1482. /* Is SASL encryption enabled on this connection ? */
  1483. if (conn->c_sasl_io) {
  1484. /* If so, call the SASL I/O layer */
  1485. ret = sasl_recv_connection(conn,conn->c_private->c_buffer, conn->c_private->c_buffer_size,err);
  1486. } else
  1487. {
  1488. /* Otherwise, just call PRRecv() */
  1489. ret = PR_Recv(conn->c_prfd,conn->c_private->c_buffer,conn->c_private->c_buffer_size,0,PR_INTERVAL_NO_WAIT);
  1490. if (ret < 0) {
  1491. *err = PR_GetError();
  1492. }
  1493. }
  1494. return ret;
  1495. }
  1496. /* Upon returning from this function, we have either:
  1497. 1. Read a PDU successfully.
  1498. 2. Detected some error condition with the connection which requires closing it.
  1499. 3. In Turbo mode, we Timed out without seeing any data.
  1500. We also handle the case where we read ahead beyond the current PDU
  1501. by buffering the data and setting the 'remaining_data' flag.
  1502. */
  1503. int connection_read_operation(Connection *conn, Operation *op, ber_tag_t *tag, int *remaining_data)
  1504. {
  1505. ber_len_t len = 0;
  1506. int ret = 0;
  1507. int waits_done = 0;
  1508. ber_int_t msgid;
  1509. int new_operation = 1; /* Are we doing the first I/O read for a new operation ? */
  1510. char *buffer = conn->c_private->c_buffer;
  1511. PRErrorCode err = 0;
  1512. PRInt32 syserr = 0;
  1513. /*
  1514. * if the socket is still valid, get the ber element
  1515. * waiting for us on this connection. timeout is handled
  1516. * in the low-level [secure_]read_function.
  1517. */
  1518. if ( (conn->c_sd == SLAPD_INVALID_SOCKET) ||
  1519. (conn->c_flags & CONN_FLAG_CLOSING) ) {
  1520. return CONN_DONE;
  1521. }
  1522. /* See if we should enable SASL I/O for this connection */
  1523. if (conn->c_enable_sasl_io) {
  1524. ret = sasl_io_setup(conn);
  1525. if (ret) {
  1526. LDAPDebug( LDAP_DEBUG_ANY,
  1527. "conn=%" NSPRIu64 " unable to enable SASL I/O\n", conn->c_connid, 0, 0 );
  1528. disconnect_server( conn, conn->c_connid, -1, SLAPD_DISCONNECT_BAD_BER_TAG, EPROTO );
  1529. return CONN_DONE;
  1530. }
  1531. }
  1532. *tag = LBER_DEFAULT;
  1533. /* First check to see if we have buffered data from "before" */
  1534. if (conn->c_private->c_buffer_bytes - conn->c_private->c_buffer_offset) {
  1535. /* If so, use that data first */
  1536. if ( 0 != get_next_from_buffer( buffer
  1537. + conn->c_private->c_buffer_offset,
  1538. conn->c_private->c_buffer_bytes
  1539. - conn->c_private->c_buffer_offset,
  1540. &len, tag, op->o_ber, conn )) {
  1541. return CONN_DONE;
  1542. }
  1543. new_operation = 0;
  1544. }
  1545. /* If we still haven't seen a complete PDU, read from the network */
  1546. while (*tag == LBER_DEFAULT) {
  1547. int ioblocktimeout_waits = config_get_ioblocktimeout() / CONN_TURBO_TIMEOUT_INTERVAL;
  1548. /* We should never get here with data remaining in the buffer */
  1549. PR_ASSERT( !new_operation || 0 == (conn->c_private->c_buffer_bytes - conn->c_private->c_buffer_offset) );
  1550. /* We make a non-blocking read call */
  1551. ret = connection_read_ldap_data(conn,&err);
  1552. if (ret <= 0) {
  1553. if (0 == ret) {
  1554. /* Connection is closed */
  1555. PR_Lock( conn->c_mutex );
  1556. disconnect_server_nomutex( conn, conn->c_connid, -1, SLAPD_DISCONNECT_BAD_BER_TAG, 0 );
  1557. conn->c_gettingber = 0;
  1558. PR_Unlock( conn->c_mutex );
  1559. signal_listner();
  1560. return CONN_DONE;
  1561. }
  1562. /* err = PR_GetError(); */
  1563. /* If we would block, we need to poll for a while */
  1564. syserr = PR_GetOSError();
  1565. if ( SLAPD_PR_WOULD_BLOCK_ERROR( err ) ||
  1566. SLAPD_SYSTEM_WOULD_BLOCK_ERROR( syserr ) ) {
  1567. struct PRPollDesc pr_pd;
  1568. PRIntervalTime timeout = PR_MillisecondsToInterval(CONN_TURBO_TIMEOUT_INTERVAL);
  1569. pr_pd.fd = (PRFileDesc *)conn->c_prfd;
  1570. pr_pd.in_flags = PR_POLL_READ;
  1571. pr_pd.out_flags = 0;
  1572. ret = PR_Poll(&pr_pd, 1, timeout);
  1573. waits_done++;
  1574. /* Did we time out ? */
  1575. if (0 == ret) {
  1576. /* We timed out, should the server shutdown ? */
  1577. if (op_shutdown) {
  1578. return CONN_SHUTDOWN;
  1579. }
  1580. /* We timed out, is this the first read in a PDU ? */
  1581. if (new_operation) {
  1582. /* If so, we return */
  1583. return CONN_TIMEDOUT;
  1584. } else {
  1585. /* Otherwise we loop, unless we exceeded the ioblock timeout */
  1586. if (waits_done > ioblocktimeout_waits) {
  1587. LDAPDebug( LDAP_DEBUG_CONNS,"ioblock timeout expired on connection %" NSPRIu64 "\n", conn->c_connid, 0, 0 );
  1588. disconnect_server( conn, conn->c_connid, -1,
  1589. SLAPD_DISCONNECT_IO_TIMEOUT, 0 );
  1590. return CONN_DONE;
  1591. } else {
  1592. /* The turbo mode may cause threads starvation.
  1593. Do a yield here to reduce the starving.
  1594. */
  1595. PR_Sleep(PR_INTERVAL_NO_WAIT);
  1596. continue;
  1597. }
  1598. }
  1599. }
  1600. if (-1 == ret) {
  1601. /* PR_Poll call failed */
  1602. err = PR_GetError();
  1603. syserr = PR_GetOSError();
  1604. LDAPDebug( LDAP_DEBUG_ANY,
  1605. "PR_Poll for connection %" NSPRIu64 " returns %d (%s)\n", conn->c_connid, err, slapd_pr_strerror( err ) );
  1606. /* If this happens we should close the connection */
  1607. disconnect_server( conn, conn->c_connid, -1, err, syserr );
  1608. return CONN_DONE;
  1609. }
  1610. } else {
  1611. /* Some other error, typically meaning bad stuff */
  1612. syserr = PR_GetOSError();
  1613. LDAPDebug( LDAP_DEBUG_CONNS,
  1614. "PR_Recv for connection %" NSPRIu64 " returns %d (%s)\n", conn->c_connid, err, slapd_pr_strerror( err ) );
  1615. /* If this happens we should close the connection */
  1616. disconnect_server( conn, conn->c_connid, -1, err, syserr );
  1617. return CONN_DONE;
  1618. }
  1619. } else {
  1620. /* We read some data off the network, do something with it */
  1621. conn->c_private->c_buffer_bytes = ret;
  1622. conn->c_private->c_buffer_offset = 0;
  1623. if ( get_next_from_buffer( buffer,
  1624. conn->c_private->c_buffer_bytes
  1625. - conn->c_private->c_buffer_offset,
  1626. &len, tag, op->o_ber, conn ) != 0 ) {
  1627. return CONN_DONE;
  1628. }
  1629. new_operation = 0;
  1630. ret = 0;
  1631. waits_done = 0; /* got some data: reset counter */
  1632. }
  1633. }
  1634. /* If there is remaining buffered data, set the flag to tell the caller */
  1635. if (conn->c_private->c_buffer_bytes - conn->c_private->c_buffer_offset) {
  1636. *remaining_data = 1;
  1637. }
  1638. if ( *tag != LDAP_TAG_MESSAGE ) {
  1639. /*
  1640. * We received a non-LDAP message. Log and close connection.
  1641. */
  1642. LDAPDebug( LDAP_DEBUG_ANY,
  1643. "conn=%" NSPRIu64 " received a non-LDAP message (tag 0x%lx, expected 0x%lx)\n",
  1644. conn->c_connid, *tag, LDAP_TAG_MESSAGE );
  1645. disconnect_server( conn, conn->c_connid, -1,
  1646. SLAPD_DISCONNECT_BAD_BER_TAG, EPROTO );
  1647. return CONN_DONE;
  1648. }
  1649. if ( (*tag = ber_get_int( op->o_ber, &msgid ))
  1650. != LDAP_TAG_MSGID ) {
  1651. /* log, close and send error */
  1652. LDAPDebug( LDAP_DEBUG_ANY,
  1653. "conn=%" NSPRIu64 " unable to read tag for incoming request\n", conn->c_connid, 0, 0 );
  1654. disconnect_server( conn, conn->c_connid, -1, SLAPD_DISCONNECT_BAD_BER_TAG, EPROTO );
  1655. return CONN_DONE;
  1656. }
  1657. if(is_ber_too_big(conn,len))
  1658. {
  1659. disconnect_server( conn, conn->c_connid, -1, SLAPD_DISCONNECT_BER_TOO_BIG, 0 );
  1660. return CONN_DONE;
  1661. }
  1662. op->o_msgid = msgid;
  1663. *tag = ber_peek_tag( op->o_ber, &len );
  1664. switch ( *tag ) {
  1665. case LBER_ERROR:
  1666. case LDAP_TAG_LDAPDN: /* optional username, for CLDAP */
  1667. /* log, close and send error */
  1668. LDAPDebug( LDAP_DEBUG_ANY,
  1669. "conn=%" NSPRIu64 " ber_peek_tag returns 0x%lx\n", conn->c_connid, *tag, 0 );
  1670. disconnect_server( conn, conn->c_connid, -1, SLAPD_DISCONNECT_BER_PEEK, EPROTO );
  1671. return CONN_DONE;
  1672. default:
  1673. break;
  1674. }
  1675. op->o_tag = *tag;
  1676. return ret;
  1677. }
  1678. void connection_make_readable(Connection *conn)
  1679. {
  1680. PR_Lock( conn->c_mutex );
  1681. conn->c_gettingber = 0;
  1682. PR_Unlock( conn->c_mutex );
  1683. signal_listner();
  1684. }
  1685. /*
  1686. * Figure out the operation completion rate for this connection
  1687. */
  1688. void connection_check_activity_level(Connection *conn)
  1689. {
  1690. int current_count = 0;
  1691. int delta_count = 0;
  1692. PR_Lock( conn->c_mutex );
  1693. /* get the current op count */
  1694. current_count = conn->c_opscompleted;
  1695. /* compare to the previous op count */
  1696. delta_count = current_count - conn->c_private->previous_op_count;
  1697. /* delta is the rate, store that */
  1698. conn->c_private->operation_rate = delta_count;
  1699. /* store current count in the previous count slot */
  1700. conn->c_private->previous_op_count = current_count;
  1701. /* update the last checked time */
  1702. conn->c_private->previous_count_check_time = current_time();
  1703. PR_Unlock( conn->c_mutex );
  1704. LDAPDebug(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " activity level = %d\n",conn->c_connid,delta_count,0);
  1705. }
  1706. typedef struct table_iterate_info_struct {
  1707. int connection_count;
  1708. int rank_count;
  1709. int our_rate;
  1710. } table_iterate_info;
  1711. int table_iterate_function(Connection *conn, void *arg)
  1712. {
  1713. int ret = 0;
  1714. table_iterate_info *pinfo = (table_iterate_info*)arg;
  1715. pinfo->connection_count++;
  1716. if (conn->c_private->operation_rate > pinfo->our_rate) {
  1717. pinfo->rank_count++;
  1718. }
  1719. return ret;
  1720. }
  1721. /*
  1722. * Scan the list of active connections, evaluate our relative rank
  1723. * for connection activity.
  1724. */
  1725. void connection_find_our_rank(Connection *conn,int *connection_count, int *our_rank)
  1726. {
  1727. table_iterate_info info = {0};
  1728. info.our_rate = conn->c_private->operation_rate;
  1729. connection_table_iterate_active_connections(the_connection_table, &info, &table_iterate_function);
  1730. *connection_count = info.connection_count;
  1731. *our_rank = info.rank_count;
  1732. }
  1733. /*
  1734. * Evaluate the turbo policy for this connection
  1735. */
  1736. void connection_enter_leave_turbo(Connection *conn, int *new_turbo_flag)
  1737. {
  1738. int current_mode = 0;
  1739. int new_mode = 0;
  1740. int connection_count = 0;
  1741. int our_rank = 0;
  1742. int threshold_rank = 0;
  1743. PR_Lock(conn->c_mutex);
  1744. /* We can already be in turbo mode, or not */
  1745. current_mode = conn->c_private->turbo_flag;
  1746. if(conn->c_private->operation_rate == 0) {
  1747. /* The connection is ranked by the passed activities. If some other connection have more activity,
  1748. increase rank by one. The highest rank is least activity, good candidates to move out of turbo mode.
  1749. However, if no activity on all the connections, then every connection gets 0 rank, so none move out.
  1750. No bother to do so much calcuation, short-cut to non-turbo mode if no activities in passed interval */
  1751. new_mode = 0;
  1752. } else {
  1753. double activet = 0.0;
  1754. connection_find_our_rank(conn,&connection_count, &our_rank);
  1755. LDAPDebug(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " turbo rank = %d out of %d conns\n",conn->c_connid,our_rank,connection_count);
  1756. activet = (double)g_get_active_threadcnt();
  1757. threshold_rank = (int)(activet * ((double)CONN_TURBO_PERCENTILE / 100.0));
  1758. /* adjust threshold_rank according number of connections,
  1759. less turbo threads as more connections,
  1760. one measure to reduce thread startvation.
  1761. */
  1762. if (connection_count > threshold_rank) {
  1763. threshold_rank -= (connection_count - threshold_rank) / 5;
  1764. }
  1765. if (current_mode) {
  1766. /* We're currently in turbo mode */
  1767. /* Policy says that we stay in turbo mode provided
  1768. connection activity is still high.
  1769. */
  1770. if (our_rank - CONN_TURBO_HYSTERESIS < threshold_rank) {
  1771. /* Stay in turbo mode */
  1772. new_mode = 1;
  1773. } else {
  1774. /* Exit turbo mode */
  1775. new_mode = 0;
  1776. }
  1777. } else {
  1778. /* We're currently not in turbo mode */
  1779. /* Policy says that we go into turbo mode if
  1780. recent connection activity is high.
  1781. */
  1782. if (our_rank + CONN_TURBO_HYSTERESIS < threshold_rank) {
  1783. /* Enter turbo mode */
  1784. new_mode = 1;
  1785. } else {
  1786. /* Stay out of turbo mode */
  1787. new_mode = 0;
  1788. }
  1789. }
  1790. }
  1791. conn->c_private->turbo_flag = new_mode;
  1792. PR_Unlock(conn->c_mutex);
  1793. if (current_mode != new_mode) {
  1794. if (current_mode) {
  1795. LDAPDebug(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " leaving turbo mode\n",conn->c_connid,0,0);
  1796. } else {
  1797. LDAPDebug(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " entering turbo mode\n",conn->c_connid,0,0);
  1798. }
  1799. }
  1800. *new_turbo_flag = new_mode;
  1801. }
  1802. static void
  1803. connection_threadmain()
  1804. {
  1805. Slapi_PBlock *pb = NULL;
  1806. PRIntervalTime interval = PR_SecondsToInterval(10);
  1807. Connection *conn = NULL;
  1808. Operation *op;
  1809. ber_tag_t tag = 0;
  1810. int need_wakeup;
  1811. int thread_turbo_flag = 0;
  1812. int ret = 0;
  1813. int more_data = 0;
  1814. int replication_connection = 0; /* If this connection is from a replication supplier, we want to ensure that operation processing is serialized */
  1815. #if defined( OSF1 ) || defined( hpux )
  1816. /* Arrange to ignore SIGPIPE signals. */
  1817. SIGNAL( SIGPIPE, SIG_IGN );
  1818. #endif
  1819. while (1) {
  1820. int is_timedout = 0;
  1821. if( op_shutdown ) {
  1822. LDAPDebug( LDAP_DEBUG_TRACE,
  1823. "op_thread received shutdown signal\n", 0, 0, 0 );
  1824. g_decr_active_threadcnt();
  1825. return;
  1826. }
  1827. if (!thread_turbo_flag && (NULL == pb) && !more_data) {
  1828. /* If more data is left from the previous connection_read_operation,
  1829. we should finish the op now. Client might be thinking it's
  1830. done sending the request and wait for the response forever.
  1831. [blackflag 624234] */
  1832. ret = connection_wait_for_new_pb(&pb,interval);
  1833. switch (ret) {
  1834. case CONN_NOWORK:
  1835. continue;
  1836. case CONN_SHUTDOWN:
  1837. LDAPDebug( LDAP_DEBUG_TRACE,
  1838. "op_thread received shutdown signal\n", 0, 0, 0 );
  1839. g_decr_active_threadcnt();
  1840. return;
  1841. case CONN_FOUND_WORK_TO_DO:
  1842. default:
  1843. break;
  1844. }
  1845. } else if (NULL == pb) {
  1846. /* The turbo mode may cause threads starvation.
  1847. Do a yield here to reduce the starving
  1848. */
  1849. PR_Sleep(PR_INTERVAL_NO_WAIT);
  1850. PR_Lock(conn->c_mutex);
  1851. /* Make our own pb in turbo mode */
  1852. connection_make_new_pb(&pb,conn);
  1853. PR_Unlock(conn->c_mutex);
  1854. if (! config_check_referral_mode()) {
  1855. slapi_counter_increment(ops_initiated);
  1856. slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps);
  1857. }
  1858. }
  1859. /* Once we're here we have a pb */
  1860. conn = pb->pb_conn;
  1861. op = pb->pb_op;
  1862. more_data = 0;
  1863. ret = connection_read_operation(conn,op,&tag,&more_data);
  1864. #define DB_PERF_TURBO 1
  1865. #if defined(DB_PERF_TURBO)
  1866. /* If it's been a while since we last did it ... */
  1867. if (current_time() - conn->c_private->previous_count_check_time > CONN_TURBO_CHECK_INTERVAL) {
  1868. int new_turbo_flag = 0;
  1869. /* Check the connection's activity level */
  1870. connection_check_activity_level(conn);
  1871. /* And if appropriate, change into or out of turbo mode */
  1872. connection_enter_leave_turbo(conn,&new_turbo_flag);
  1873. thread_turbo_flag = new_turbo_flag;
  1874. }
  1875. /* turn off turbo mode immediately if any pb waiting in global queue */
  1876. if (thread_turbo_flag && (counter > 0)) {
  1877. thread_turbo_flag = 0;
  1878. LDAPDebug(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " leaving turbo mode\n",conn->c_connid,0,0);
  1879. }
  1880. #endif
  1881. switch (ret) {
  1882. case CONN_DONE:
  1883. /* This means that the connection was closed, so clear turbo mode */
  1884. /*FALLTHROUGH*/
  1885. case CONN_TIMEDOUT:
  1886. thread_turbo_flag = 0;
  1887. is_timedout = 1;
  1888. /* note:
  1889. * should call connection_make_readable after the op is removed
  1890. * connection_make_readable(conn);
  1891. */
  1892. goto done;
  1893. case CONN_SHUTDOWN:
  1894. LDAPDebug( LDAP_DEBUG_TRACE,
  1895. "op_thread received shutdown signal\n", 0, 0, 0 );
  1896. g_decr_active_threadcnt();
  1897. return;
  1898. default:
  1899. break;
  1900. }
  1901. /*
  1902. * Do not put the connection back to the read ready poll list
  1903. * if the operation is unbind. Unbind will close the socket.
  1904. * Similarly, if we are in turbo mode, don't send the socket
  1905. * back to the poll set.
  1906. * more_data: [blackflag 624234]
  1907. * If the connection is from a replication supplier, don't make it readable here.
  1908. * We want to ensure that replication operations are processed strictly in the order
  1909. * they are received off the wire.
  1910. */
  1911. replication_connection = conn->c_isreplication_session;
  1912. if (tag != LDAP_REQ_UNBIND && (!thread_turbo_flag) && !more_data && !replication_connection) {
  1913. connection_make_readable(conn);
  1914. }
  1915. /* are we in referral-only mode? */
  1916. if (config_check_referral_mode() && tag != LDAP_REQ_UNBIND) {
  1917. referral_mode_reply(pb);
  1918. goto done;
  1919. }
  1920. /* check if new password is required */
  1921. if(connection_need_new_password(conn, op, pb)) {
  1922. goto done;
  1923. }
  1924. /* if this is a bulk import, only "add" and "import done"
  1925. * are allowed */
  1926. if (conn->c_flags & CONN_FLAG_IMPORT) {
  1927. if ((tag != LDAP_REQ_ADD) && (tag != LDAP_REQ_EXTENDED)) {
  1928. /* no cookie for you. */
  1929. LDAPDebug(LDAP_DEBUG_ANY, "Attempted operation %d "
  1930. "from within bulk import\n",
  1931. tag, 0, 0);
  1932. slapi_send_ldap_result(pb, LDAP_PROTOCOL_ERROR, NULL,
  1933. NULL, 0, NULL);
  1934. goto done;
  1935. }
  1936. }
  1937. /*
  1938. * Call the do_<operation> function to process this request.
  1939. */
  1940. connection_dispatch_operation(conn, op, pb);
  1941. done:
  1942. /*
  1943. * done with this operation. delete it from the op
  1944. * queue for this connection, delete the number of
  1945. * threads devoted to this connection, and see if
  1946. * there's more work to do right now on this conn.
  1947. */
  1948. /* number of ops on this connection */
  1949. PR_AtomicIncrement(&conn->c_opscompleted);
  1950. /* total number of ops for the server */
  1951. slapi_counter_increment(ops_completed);
  1952. /* If this op isn't a persistent search, remove it */
  1953. if ( !( pb->pb_op->o_flags & OP_FLAG_PS )) {
  1954. /* delete from connection operation queue & decr refcnt */
  1955. PR_Lock( conn->c_mutex );
  1956. connection_remove_operation( conn, op );
  1957. /* destroying the pblock will cause destruction of the operation
  1958. * so this must happend before releasing the connection
  1959. */
  1960. slapi_pblock_destroy( pb );
  1961. /* If we're in turbo mode, we keep our reference to the connection
  1962. alive */
  1963. if (!thread_turbo_flag && !more_data) {
  1964. connection_release_nolock (conn);
  1965. }
  1966. PR_Unlock( conn->c_mutex );
  1967. }
  1968. /* Since we didn't do so earlier, we need to make a replication connection readable again here */
  1969. if ( ((1 == is_timedout) || (replication_connection && !thread_turbo_flag)) && !more_data)
  1970. connection_make_readable(conn);
  1971. pb = NULL;
  1972. if (!thread_turbo_flag && !more_data) { /* Don't do this in turbo mode */
  1973. PR_Lock( conn->c_mutex );
  1974. /* if the threadnumber of now below the maximum, wakeup
  1975. * the listener thread so that we start polling on this
  1976. * connection again
  1977. */
  1978. /* DBDB I think this code is bogus -- we already signaled the listener above here */
  1979. if (conn->c_threadnumber == config_get_maxthreadsperconn())
  1980. need_wakeup = 1;
  1981. else
  1982. need_wakeup = 0;
  1983. conn->c_threadnumber--;
  1984. PR_Unlock( conn->c_mutex );
  1985. if (need_wakeup)
  1986. signal_listner();
  1987. }
  1988. } /* while (1) */
  1989. }
  1990. /* thread need to hold conn->c_mutex before calling this function */
  1991. int
  1992. connection_activity(Connection *conn)
  1993. {
  1994. Slapi_PBlock *pb;
  1995. connection_make_new_pb(&pb, conn);
  1996. /* Add pb to the end of the work queue. */
  1997. add_pb( pb );
  1998. /* Check if exceed the max thread per connection. If so, increment
  1999. c_pbwait. Otherwise increment the counter and notify the cond. var.
  2000. there is work to do. */
  2001. if (connection_acquire_nolock (conn) == -1) {
  2002. LDAPDebug(LDAP_DEBUG_CONNS,
  2003. "could not acquire lock in connection_activity as conn %" NSPRIu64 " closing fd=%d\n",
  2004. conn->c_connid,conn->c_sd,0);
  2005. /* XXX how to handle this error? */
  2006. /* MAB: 25 Jan 01: let's return on error and pray this won't leak */
  2007. return (-1);
  2008. }
  2009. conn->c_gettingber = 1;
  2010. conn->c_threadnumber++;
  2011. PR_Lock( op_thread_lock );
  2012. counter++;
  2013. PR_NotifyCondVar( op_thread_cv );
  2014. PR_Unlock( op_thread_lock );
  2015. if (! config_check_referral_mode()) {
  2016. slapi_counter_increment(ops_initiated);
  2017. slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps);
  2018. }
  2019. return 0;
  2020. }
  2021. /* add_pb(): will add a pb to the end of the global work queue. The work queue
  2022. is implemented as a singal link list. */
  2023. static void
  2024. add_pb( Slapi_PBlock *pb)
  2025. {
  2026. struct Slapi_PBlock_q *new_pb=NULL;
  2027. LDAPDebug( LDAP_DEBUG_TRACE, "add_pb \n", 0, 0, 0 );
  2028. new_pb = (struct Slapi_PBlock_q *) slapi_ch_malloc ( sizeof( struct Slapi_PBlock_q ));
  2029. new_pb->pb = pb;
  2030. new_pb->next_pb =NULL;
  2031. PR_Lock( pb_q_lock );
  2032. if (last_pb == NULL) {
  2033. last_pb = new_pb;
  2034. first_pb = new_pb;
  2035. }
  2036. else {
  2037. last_pb->next_pb = new_pb;
  2038. last_pb = new_pb;
  2039. }
  2040. PR_Unlock( pb_q_lock );
  2041. }
  2042. /* get_pb(): will get a pb from the begining of the work queue, return NULL if
  2043. the queue is empty.*/
  2044. static Slapi_PBlock *
  2045. get_pb()
  2046. {
  2047. struct Slapi_PBlock_q *tmp = NULL;
  2048. Slapi_PBlock *pb;
  2049. LDAPDebug( LDAP_DEBUG_TRACE, "get_pb \n", 0, 0, 0 );
  2050. PR_Lock( pb_q_lock );
  2051. if (first_pb == NULL) {
  2052. PR_Unlock( pb_q_lock );
  2053. LDAPDebug( LDAP_DEBUG_ANY, "get_pb: the work queue is empty.\n",
  2054. 0, 0, 0 );
  2055. return NULL;
  2056. }
  2057. tmp = first_pb;
  2058. if ( first_pb == last_pb ) {
  2059. last_pb = NULL;
  2060. }
  2061. first_pb = tmp->next_pb;
  2062. PR_Unlock( pb_q_lock );
  2063. pb = tmp->pb;
  2064. /* Free the memory used by the pb found. */
  2065. slapi_ch_free ((void **)&tmp);
  2066. return (pb);
  2067. }
  2068. #endif /* LDAP_IOCP */
  2069. /* Helper functions common to both varieties of connection code: */
  2070. /* op_thread_cleanup() : This function is called by daemon thread when it gets
  2071. the slapd_shutdown signal. It will set op_shutdown to 1 and notify
  2072. all thread waiting on op_thread_cv to terminate. */
  2073. void
  2074. op_thread_cleanup()
  2075. {
  2076. #ifdef _WIN32
  2077. int i;
  2078. PRIntervalTime interval;
  2079. int max_threads = config_get_threadnumber();
  2080. interval = PR_SecondsToInterval(3);
  2081. #endif
  2082. LDAPDebug( LDAP_DEBUG_ANY,
  2083. "slapd shutting down - signaling operation threads\n", 0, 0, 0);
  2084. PR_Lock( op_thread_lock );
  2085. op_shutdown = 1;
  2086. PR_NotifyAllCondVar ( op_thread_cv );
  2087. PR_Unlock( op_thread_lock );
  2088. #ifdef _WIN32
  2089. LDAPDebug( LDAP_DEBUG_ANY,
  2090. "slapd shutting down - waiting for %d threads to terminate\n",
  2091. g_get_active_threadcnt(), 0, 0 );
  2092. /* kill off each worker waiting on GetQueuedCompletionStatus */
  2093. for ( i = 0; i < max_threads; ++ i )
  2094. {
  2095. PostQueuedCompletionStatus( completion_port, 0, COMPKEY_DIE ,0);
  2096. }
  2097. /* don't sleep: there's no reason to do so here DS_Sleep(interval); */ /* sleep 3 seconds */
  2098. #endif
  2099. }
  2100. static void
  2101. connection_add_operation(Connection* conn,Operation* op)
  2102. {
  2103. Operation **olist= &conn->c_ops;
  2104. int id= conn->c_opsinitiated++;
  2105. PRUint64 connid = conn->c_connid;
  2106. Operation **tmp;
  2107. /* slapi_ch_stop_recording(); */
  2108. for ( tmp = olist; *tmp != NULL; tmp = &(*tmp)->o_next )
  2109. ; /* NULL */
  2110. *tmp= op;
  2111. op->o_opid = id;
  2112. op->o_connid = connid;
  2113. /* Call the plugin extension constructors */
  2114. op->o_extension = factory_create_extension(get_operation_object_type(),op,conn);
  2115. }
  2116. /*
  2117. * Find an Operation on the Connection, and zap it in the butt.
  2118. * Call this function with conn->c_mutex locked.
  2119. */
  2120. void
  2121. connection_remove_operation( Connection *conn, Operation *op )
  2122. {
  2123. Operation **olist= &conn->c_ops;
  2124. Operation **tmp;
  2125. for ( tmp = olist; *tmp != NULL && *tmp != op; tmp = &(*tmp)->o_next )
  2126. ; /* NULL */
  2127. if ( *tmp == NULL )
  2128. {
  2129. LDAPDebug( LDAP_DEBUG_ANY, "connection_remove_operation: can't find op %d for conn %" NSPRIu64 "\n",
  2130. (int)op->o_msgid, conn->c_connid, 0 );
  2131. }
  2132. else
  2133. {
  2134. *tmp = (*tmp)->o_next;
  2135. }
  2136. }
  2137. /*
  2138. * Return a non-zero value if any operations are pending on conn.
  2139. * Operation op2ignore is ignored (okay to pass NULL). Typically, op2ignore
  2140. * is the caller's op (because the caller wants to check if all other
  2141. * ops are done).
  2142. * If test_resultsent is non-zero, operations that have already sent
  2143. * a result to the client are ignored.
  2144. * Call this function with conn->c_mutex locked.
  2145. */
  2146. int
  2147. connection_operations_pending( Connection *conn, Operation *op2ignore,
  2148. int test_resultsent )
  2149. {
  2150. Operation *op;
  2151. PR_ASSERT( conn != NULL );
  2152. for ( op = conn->c_ops; op != NULL; op = op->o_next ) {
  2153. if ( op == op2ignore ) {
  2154. continue;
  2155. }
  2156. if ( !test_resultsent || op->o_status != SLAPI_OP_STATUS_RESULT_SENT ) {
  2157. break;
  2158. }
  2159. }
  2160. return( op != NULL );
  2161. }
  2162. /* Copy the authorization identity from the connection struct into the
  2163. * operation struct. We do this late, because an operation might start
  2164. * before authentication is complete, at least on an SSL connection.
  2165. * We want each operation to get its authorization identity after the
  2166. * SSL software has had its chance to finish the SSL handshake;
  2167. * that is, after the first few bytes of the request are received.
  2168. * In particular, we want the first request from an LDAPS client
  2169. * to have an authorization identity derived from the initial SSL
  2170. * handshake.
  2171. */
  2172. static void
  2173. op_copy_identity(Connection *conn, Operation *op)
  2174. {
  2175. size_t dnlen;
  2176. size_t typelen;
  2177. PR_Lock( conn->c_mutex );
  2178. dnlen= conn->c_dn ? strlen (conn->c_dn) : 0;
  2179. typelen= conn->c_authtype ? strlen (conn->c_authtype) : 0;
  2180. slapi_sdn_done(&op->o_sdn);
  2181. slapi_ch_free_string(&(op->o_authtype));
  2182. if (dnlen <= 0 && typelen <= 0) {
  2183. op->o_authtype = NULL;
  2184. } else {
  2185. char* id = slapi_ch_malloc (typelen + 1);
  2186. if (typelen <= 0)
  2187. id[dnlen+1] = '\0';
  2188. else
  2189. memcpy (id, conn->c_authtype, typelen + 1);
  2190. slapi_sdn_set_dn_byval(&op->o_sdn,conn->c_dn);
  2191. op->o_authtype = id;
  2192. }
  2193. /* XXX We should also copy c_client_cert into *op here; it's
  2194. * part of the authorization identity. The operation's copy
  2195. * (not c_client_cert) should be used for access control.
  2196. */
  2197. /* copy isroot flag as well so root DN privileges are preserved */
  2198. op->o_isroot = conn->c_isroot;
  2199. PR_Unlock( conn->c_mutex );
  2200. }
  2201. static int
  2202. is_ber_too_big(const Connection *conn, ber_len_t ber_len)
  2203. {
  2204. ber_len_t maxbersize= config_get_maxbersize();
  2205. if(ber_len > maxbersize)
  2206. {
  2207. log_ber_too_big_error(conn, ber_len, maxbersize);
  2208. return 1;
  2209. }
  2210. return 0;
  2211. }
  2212. /*
  2213. * Pass 0 for maxbersize if you do not have it handy. It is also OK to pass
  2214. * 0 for ber_len, in which case a slightly less informative message is
  2215. * logged.
  2216. */
  2217. static void
  2218. log_ber_too_big_error(const Connection *conn, ber_len_t ber_len,
  2219. ber_len_t maxbersize)
  2220. {
  2221. if (0 == maxbersize) {
  2222. maxbersize= config_get_maxbersize();
  2223. }
  2224. if (0 == ber_len) {
  2225. slapi_log_error( SLAPI_LOG_FATAL, "connection",
  2226. "conn=%" NSPRIu64 " fd=%d Incoming BER Element was too long, max allowable"
  2227. " is %u bytes. Change the nsslapd-maxbersize attribute in"
  2228. " cn=config to increase.\n",
  2229. conn->c_connid, conn->c_sd, maxbersize );
  2230. } else {
  2231. slapi_log_error( SLAPI_LOG_FATAL, "connection",
  2232. "conn=%" NSPRIu64 " fd=%d Incoming BER Element was %u bytes, max allowable"
  2233. " is %u bytes. Change the nsslapd-maxbersize attribute in"
  2234. " cn=config to increase.\n",
  2235. conn->c_connid, conn->c_sd, ber_len, maxbersize );
  2236. }
  2237. }
  2238. void
  2239. disconnect_server( Connection *conn, PRUint64 opconnid, int opid, PRErrorCode reason, PRInt32 error )
  2240. {
  2241. PR_Lock( conn->c_mutex );
  2242. disconnect_server_nomutex( conn, opconnid, opid, reason, error );
  2243. PR_Unlock( conn->c_mutex );
  2244. }
  2245. static ps_wakeup_all_fn_ptr ps_wakeup_all_fn = NULL;
  2246. /*
  2247. * disconnect_server - close a connection. takes the connection to close,
  2248. * the connid associated with the operation generating the close (so we
  2249. * don't accidentally close a connection that's not ours), and the opid
  2250. * of the operation generating the close (for logging purposes).
  2251. */
  2252. void
  2253. disconnect_server_nomutex( Connection *conn, PRUint64 opconnid, int opid, PRErrorCode reason, PRInt32 error )
  2254. {
  2255. if ( ( conn->c_sd != SLAPD_INVALID_SOCKET &&
  2256. conn->c_connid == opconnid ) && !(conn->c_flags & CONN_FLAG_CLOSING) ) {
  2257. /*
  2258. * PR_Close must be called before anything else is done because
  2259. * of NSPR problem on NT which requires that the socket on which
  2260. * I/O timed out is closed before any other I/O operation is
  2261. * attempted by the thread.
  2262. * WARNING : As of today the current code does not fulfill the
  2263. * requirements above.
  2264. */
  2265. /* Mark that the socket should be closed on this connection.
  2266. * We don't want to actually close the socket here, because
  2267. * the listener thread could be PR_Polling over it right now.
  2268. * The last thread to stop using the connection will do the closing.
  2269. */
  2270. conn->c_flags |= CONN_FLAG_CLOSING;
  2271. g_decrement_current_conn_count();
  2272. /*
  2273. * Print the error captured above.
  2274. */
  2275. if (error && (EPIPE != error) ) {
  2276. slapi_log_access( LDAP_DEBUG_STATS,
  2277. "conn=%" NSPRIu64 " op=%d fd=%d closed error %d (%s) - %s\n",
  2278. conn->c_connid, opid, conn->c_sd, error,
  2279. slapd_system_strerror(error),
  2280. slapd_pr_strerror(reason));
  2281. } else {
  2282. slapi_log_access( LDAP_DEBUG_STATS,
  2283. "conn=%" NSPRIu64 " op=%d fd=%d closed - %s\n",
  2284. conn->c_connid, opid, conn->c_sd,
  2285. slapd_pr_strerror(reason));
  2286. }
  2287. if (! config_check_referral_mode()) {
  2288. slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsConnections);
  2289. }
  2290. conn->c_gettingber = 0;
  2291. connection_abandon_operations( conn );
  2292. if (! config_check_referral_mode()) {
  2293. /*
  2294. * If any of the outstanding operations on this
  2295. * connection were persistent searches, then
  2296. * ding all the persistent searches to get them
  2297. * to notice that their operations have been abandoned.
  2298. */
  2299. int found_ps = 0;
  2300. Operation *o;
  2301. for ( o = conn->c_ops; !found_ps && o != NULL; o = o->o_next ) {
  2302. if ( o->o_flags & OP_FLAG_PS ) {
  2303. found_ps = 1;
  2304. }
  2305. }
  2306. if ( found_ps ) {
  2307. if ( NULL == ps_wakeup_all_fn ) {
  2308. if ( get_entry_point( ENTRY_POINT_PS_WAKEUP_ALL,
  2309. (caddr_t *)(&ps_wakeup_all_fn )) == 0 ) {
  2310. (ps_wakeup_all_fn)();
  2311. }
  2312. } else {
  2313. (ps_wakeup_all_fn)();
  2314. }
  2315. }
  2316. }
  2317. }
  2318. }
  2319. void
  2320. connection_abandon_operations( Connection *c )
  2321. {
  2322. Operation *op;
  2323. for ( op = c->c_ops; op != NULL; op = op->o_next )
  2324. {
  2325. /* abandon the operation only if it is not yet
  2326. completed (i.e., no result has been sent yet to
  2327. the client */
  2328. if ( op->o_status != SLAPI_OP_STATUS_RESULT_SENT ) {
  2329. op->o_status = SLAPI_OP_STATUS_ABANDONED;
  2330. }
  2331. }
  2332. }