| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913491449154916491749184919492049214922492349244925492649274928492949304931493249334934493549364937493849394940494149424943494449454946494749484949495049514952495349544955495649574958495949604961496249634964496549664967496849694970497149724973497449754976497749784979498049814982498349844985498649874988498949904991499249934994499549964997499849995000500150025003500450055006500750085009501050115012501350145015501650175018501950205021502250235024502550265027502850295030503150325033503450355036503750385039504050415042504350445045504650475048504950505051505250535054505550565057505850595060506150625063506450655066506750685069507050715072507350745075507650775078507950805081508250835084508550865087508850895090509150925093509450955096509750985099510051015102510351045105510651075108510951105111511251135114511551165117511851195120512151225123512451255126512751285129513051315132513351345135513651375138513951405141514251435144514551465147514851495150515151525153515451555156515751585159516051615162516351645165516651675168516951705171517251735174517551765177517851795180518151825183518451855186518751885189519051915192519351945195519651975198519952005201520252035204520552065207520852095210521152125213521452155216521752185219522052215222522352245225522652275228522952305231523252335234523552365237523852395240524152425243524452455246524752485249525052515252525352545255525652575258525952605261526252635264526552665267526852695270527152725273527452755276527752785279528052815282528352845285528652875288528952905291529252935294529552965297529852995300530153025303530453055306530753085309531053115312531353145315531653175318531953205321532253235324532553265327532853295330533153325333533453355336533753385339534053415342534353445345534653475348534953505351535253535354535553565357535853595360536153625363536453655366536753685369537053715372537353745375537653775378537953805381538253835384538553865387538853895390539153925393539453955396539753985399540054015402540354045405540654075408540954105411541254135414541554165417541854195420542154225423542454255426542754285429543054315432543354345435543654375438543954405441544254435444544554465447544854495450545154525453545454555456545754585459546054615462546354645465546654675468546954705471547254735474547554765477547854795480548154825483548454855486548754885489549054915492549354945495549654975498549955005501550255035504550555065507550855095510551155125513551455155516551755185519552055215522552355245525552655275528552955305531553255335534553555365537553855395540554155425543554455455546554755485549555055515552555355545555555655575558555955605561556255635564556555665567556855695570557155725573557455755576557755785579558055815582558355845585558655875588558955905591559255935594559555965597559855995600560156025603560456055606560756085609561056115612561356145615561656175618561956205621562256235624562556265627562856295630563156325633563456355636563756385639564056415642564356445645564656475648564956505651565256535654565556565657565856595660566156625663566456655666566756685669567056715672567356745675567656775678567956805681568256835684568556865687568856895690569156925693569456955696569756985699570057015702570357045705570657075708570957105711571257135714571557165717571857195720572157225723572457255726572757285729573057315732573357345735573657375738573957405741574257435744574557465747574857495750575157525753575457555756575757585759576057615762576357645765576657675768576957705771577257735774577557765777577857795780578157825783578457855786578757885789579057915792579357945795579657975798579958005801580258035804580558065807580858095810581158125813581458155816581758185819582058215822582358245825582658275828582958305831583258335834583558365837583858395840584158425843584458455846584758485849585058515852585358545855585658575858585958605861586258635864586558665867586858695870587158725873587458755876587758785879588058815882588358845885588658875888588958905891589258935894589558965897589858995900590159025903590459055906590759085909591059115912591359145915591659175918591959205921592259235924592559265927592859295930593159325933593459355936593759385939594059415942594359445945594659475948594959505951595259535954595559565957595859595960596159625963596459655966596759685969597059715972597359745975597659775978597959805981598259835984598559865987598859895990599159925993599459955996599759985999600060016002600360046005600660076008600960106011601260136014601560166017601860196020602160226023602460256026602760286029603060316032603360346035603660376038603960406041604260436044604560466047604860496050605160526053605460556056605760586059606060616062606360646065606660676068606960706071607260736074607560766077607860796080608160826083608460856086608760886089609060916092609360946095609660976098609961006101610261036104610561066107610861096110611161126113611461156116611761186119612061216122612361246125612661276128612961306131613261336134613561366137613861396140614161426143614461456146614761486149615061516152615361546155615661576158615961606161616261636164616561666167616861696170617161726173617461756176617761786179618061816182618361846185618661876188618961906191619261936194619561966197619861996200620162026203620462056206620762086209621062116212621362146215621662176218621962206221622262236224622562266227622862296230623162326233623462356236623762386239624062416242624362446245624662476248624962506251625262536254625562566257625862596260626162626263626462656266626762686269627062716272627362746275627662776278627962806281628262836284628562866287628862896290629162926293629462956296629762986299630063016302630363046305630663076308630963106311631263136314631563166317631863196320632163226323632463256326632763286329633063316332633363346335633663376338633963406341634263436344634563466347634863496350635163526353635463556356635763586359636063616362636363646365636663676368636963706371637263736374637563766377637863796380638163826383638463856386638763886389639063916392639363946395639663976398639964006401640264036404640564066407640864096410641164126413641464156416641764186419642064216422642364246425642664276428642964306431643264336434643564366437643864396440644164426443644464456446644764486449645064516452645364546455645664576458645964606461646264636464646564666467646864696470647164726473647464756476647764786479648064816482648364846485648664876488648964906491649264936494649564966497649864996500650165026503650465056506650765086509651065116512651365146515651665176518651965206521652265236524652565266527652865296530653165326533 |
- /** BEGIN COPYRIGHT BLOCK
- * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission.
- * Copyright (C) 2005 Red Hat, Inc.
- * Copyright (C) 2010 Hewlett-Packard Development Company, L.P.
- * All rights reserved.
- *
- * License: GPL (version 3 or any later version).
- * See LICENSE for details.
- * END COPYRIGHT BLOCK **/
- #ifdef HAVE_CONFIG_H
- #include <config.h>
- #endif
- /* cl5_api.c - implementation of 5.0 style changelog API */
- #include <unistd.h>
- #include <errno.h>
- #include <sys/stat.h>
- #if defined(OS_solaris) || defined(hpux)
- #include <sys/types.h>
- #include <sys/statvfs.h>
- #endif
- #if defined(linux)
- #include <sys/vfs.h>
- #endif
- #include "cl5.h"
- #include "cl_crypt.h"
- #include "plhash.h"
- #include "plstr.h"
- #include "db.h"
- #include "cl5_clcache.h" /* To use the Changelog Cache */
- #include "repl5.h" /* for agmt_get_consumer_rid() */
- #define GUARDIAN_FILE "guardian" /* name of the guardian file */
- #define VERSION_FILE "DBVERSION" /* name of the version file */
- #define V_5 5 /* changelog entry version */
- #define CHUNK_SIZE 64 * 1024
- #define DBID_SIZE 64
- #define FILE_SEP "_" /* separates parts of the db file name */
- #define T_CSNSTR "csn"
- #define T_UNIQUEIDSTR "nsuniqueid"
- #define T_PARENTIDSTR "parentuniqueid"
- #define T_NEWSUPERIORDNSTR "newsuperiordn"
- #define T_NEWSUPERIORIDSTR "newsuperioruniqueid"
- #define T_REPLGEN "replgen"
- #define ENTRY_COUNT_TIME 111 /* this time is used to construct csn \
- used to store/retrieve entry count */
- #define PURGE_RUV_TIME 222 /* this time is used to construct csn \
- used to store purge RUV vector */
- #define MAX_RUV_TIME 333 /* this time is used to construct csn \
- used to store upper boundary RUV vector */
- #define DB_EXTENSION_DB3 "db3"
- #define DB_EXTENSION_DB4 "db4"
- #if 1000 * DB_VERSION_MAJOR + 100 * DB_VERSION_MINOR >= 5000
- #define DB_EXTENSION "db"
- #else
- #define DB_EXTENSION "db4"
- #endif
- #define HASH_BACKETS_COUNT 16 /* number of buckets in a hash table */
- #define DEFAULT_DB_ENV_OP_FLAGS DB_AUTO_COMMIT
- #define DB_OPEN(oflags, db, txnid, file, database, type, flags, mode, rval) \
- { \
- if (((oflags)&DB_INIT_TXN) && ((oflags)&DB_INIT_LOG)) { \
- (rval) = (db)->open((db), (txnid), (file), (database), (type), (flags) | DB_AUTO_COMMIT, (mode)); \
- } else { \
- (rval) = (db)->open((db), (txnid), (file), (database), (type), (flags), (mode)); \
- } \
- }
- #define TXN_BEGIN(env, parent_txn, tid, flags) \
- (env)->txn_begin((env), (parent_txn), (tid), (flags))
- #define TXN_COMMIT(txn) (txn)->commit((txn), 0)
- #define TXN_ABORT(txn) (txn)->abort(txn)
- /*
- * The defult thread stacksize for nspr21 is 64k. For OSF, we require
- * a larger stacksize as actual storage allocation is higher i.e
- * pointers are allocated 8 bytes but lower 4 bytes are used.
- * The value 0 means use the default stacksize.
- */
- #if defined(__LP64__) || defined(_LP64) /* 64-bit architectures need bigger stacks */
- #if defined(__hpux) && defined(__ia64)
- #define DEFAULT_THREAD_STACKSIZE 524288L
- #else
- #define DEFAULT_THREAD_STACKSIZE 131072L
- #endif
- #else
- #define DEFAULT_THREAD_STACKSIZE 0
- #endif
- #define FILE_CREATE_MODE S_IRUSR | S_IWUSR
- #define DIR_CREATE_MODE 0755
- #define NO_DISK_SPACE 1024
- #define MIN_DISK_SPACE 10485760 /* 10 MB */
- /***** Data Definitions *****/
- /* possible changelog open modes */
- typedef enum {
- CL5_OPEN_NONE, /* nothing specified */
- CL5_OPEN_NORMAL, /* open for normal read/write use */
- CL5_OPEN_RESTORE_RECOVER, /* restore from archive and recover */
- CL5_OPEN_RESTORE, /* restore, but no recovery */
- CL5_OPEN_LDIF2CL, /* open as part of ldif2cl: no locking,
- recovery, checkpointing */
- CL5_OPEN_CLEAN_RECOVER /* remove env after recover open (upgrade) */
- } CL5OpenMode;
- #define DB_FILE_DELETED 0x1
- #define DB_FILE_INIT 0x2
- /* this structure represents one changelog file, Each changelog file contains
- changes applied to a single backend. Files are named by the database id */
- typedef struct cl5dbfile
- {
- char *name; /* file name (with the extension) */
- char *replGen; /* replica generation of the data */
- char *replName; /* replica name */
- DB *db; /* db handle to the changelog file*/
- int entryCount; /* number of entries in the file */
- int flags; /* currently used to mark the file as deleted
- * or as initialized */
- RUV *purgeRUV; /* ruv to which the file has been purged */
- RUV *maxRUV; /* ruv that marks the upper boundary of the data */
- } CL5DBFile;
- /* structure that allows to iterate through entries to be sent to a consumer
- that originated on a particular supplier. */
- struct cl5replayiterator
- {
- Object *fileObj;
- CLC_Buffer *clcache; /* changelog cache */
- ReplicaId consumerRID; /* consumer's RID */
- const RUV *consumerRuv; /* consumer's update vector */
- Object *supplierRuvObj; /* supplier's update vector object */
- };
- typedef struct cl5iterator
- {
- DBC *cursor; /* current position in the db file */
- Object *file; /* handle to release db file object */
- } CL5Iterator;
- /* changelog trimming configuration */
- typedef struct cl5trim
- {
- time_t maxAge; /* maximum entry age in seconds */
- int maxEntries; /* maximum number of entries across all changelog files */
- int compactInterval; /* interval to compact changelog db */
- int trimInterval; /* trimming interval */
- PRLock *lock; /* controls access to trimming configuration */
- } CL5Trim;
- /* this structure defines 5.0 changelog internals */
- typedef struct cl5desc
- {
- char *dbDir; /* absolute path to changelog directory */
- DB_ENV *dbEnv; /* db environment shared by all db files */
- int dbEnvOpenFlags; /* openflag used for env->open */
- Objset *dbFiles; /* ref counted set of changelog files (CL5DBFile) */
- PRLock *fileLock; /* ensures that changelog file is not added twice */
- CL5OpenMode dbOpenMode; /* how we open db */
- CL5DBConfig dbConfig; /* database configuration params */
- CL5Trim dbTrim; /* trimming parameters */
- CL5State dbState; /* changelog current state */
- Slapi_RWLock *stLock; /* lock that controls access to the changelog state */
- PRBool dbRmOnClose; /* indicates whether changelog should be removed when
- it is closed */
- PRBool fatalError; /* bad stuff happened like out of disk space; don't
- write guardian file on close - UnUsed so far */
- int threadCount; /* threads that globally access changelog like
- deadlock detection, etc. */
- PRLock *clLock; /* Lock associated to clVar, used to notify threads on close */
- PRCondVar *clCvar; /* Condition Variable used to notify threads on close */
- void *clcrypt_handle; /* for cl encryption */
- } CL5Desc;
- typedef void (*VFP)(void *);
- /***** Global Variables *****/
- static CL5Desc s_cl5Desc = {0};
- /***** Forward Declarations *****/
- /* changelog initialization and cleanup */
- static int _cl5Open(const char *dir, const CL5DBConfig *config, CL5OpenMode openMode);
- static int _cl5AppInit(void);
- static int _cl5DBOpen(void);
- static void _cl5SetDefaultDBConfig(void);
- static void _cl5SetDBConfig(const CL5DBConfig *config);
- static int _cl5CheckDBVersion(void);
- static int _cl5ReadDBVersion(const char *dir, char *clVersion, int buflen);
- static int _cl5WriteDBVersion(void);
- static void _cl5Close(void);
- static int _cl5Delete(const char *dir, PRBool rmDir);
- static void _cl5DBClose(void);
- /* thread management */
- static int _cl5DispatchDBThreads(void);
- static int _cl5AddThread(void);
- static void _cl5RemoveThread(void);
- /* functions that work with individual changelog files */
- static int _cl5NewDBFile(const char *replName, const char *replGen, CL5DBFile **dbFile);
- static int _cl5DBOpenFile(Object *replica, Object **obj, PRBool checkDups);
- static int _cl5DBOpenFileByReplicaName(const char *replName, const char *replGen, Object **obj, PRBool checkDups);
- static void _cl5DBCloseFile(void **data);
- static void _cl5DBDeleteFile(Object *obj);
- static void _cl5DBFileInitialized(Object *obj);
- static int _cl5GetDBFile(Object *replica, Object **obj);
- static int _cl5GetDBFileByReplicaName(const char *replName, const char *replGen, Object **obj);
- static int _cl5AddDBFile(CL5DBFile *file, Object **obj);
- static int _cl5CompareDBFile(Object *el1, const void *el2);
- static char *_cl5Replica2FileName(Object *replica);
- static char *_cl5MakeFileName(const char *replName, const char *replGen);
- static PRBool _cl5FileName2Replica(const char *fileName, Object **replica);
- static int _cl5ExportFile(PRFileDesc *prFile, Object *obj);
- static PRBool _cl5ReplicaInList(Object *replica, Object **replicas);
- /* data storage and retrieval */
- static int _cl5Entry2DBData(const CL5Entry *entry, char **data, PRUint32 *len);
- static int _cl5WriteOperation(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local);
- static int _cl5WriteOperationTxn(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local, void *txn);
- static int _cl5GetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid);
- static int _cl5GetNextEntry(CL5Entry *entry, void *iterator);
- static int _cl5CurrentDeleteEntry(void *iterator);
- static PRBool _cl5IsValidIterator(const CL5Iterator *iterator);
- static int _cl5GetOperation(Object *replica, slapi_operation_parameters *op);
- static const char *_cl5OperationType2Str(int type);
- static int _cl5Str2OperationType(const char *str);
- static void _cl5WriteString(const char *str, char **buff);
- static void _cl5ReadString(char **str, char **buff);
- static void _cl5WriteMods(LDAPMod **mods, char **buff);
- static int _cl5WriteMod(LDAPMod *mod, char **buff);
- static int _cl5ReadMods(LDAPMod ***mods, char **buff);
- static int _cl5ReadMod(Slapi_Mod *mod, char **buff);
- static int _cl5GetModsSize(LDAPMod **mods);
- static int _cl5GetModSize(LDAPMod *mod);
- static void _cl5ReadBerval(struct berval *bv, char **buff);
- static void _cl5WriteBerval(struct berval *bv, char **buff);
- static int _cl5ReadBervals(struct berval ***bv, char **buff, unsigned int size);
- static int _cl5WriteBervals(struct berval **bv, char **buff, u_int32_t *size);
- /* replay iteration */
- #ifdef FOR_DEBUGGING
- static PRBool _cl5ValidReplayIterator(const CL5ReplayIterator *iterator);
- #endif
- static int _cl5PositionCursorForReplay(ReplicaId consumerRID, const RUV *consumerRuv, Object *replica, Object *fileObject, CL5ReplayIterator **iterator, int *continue_on_missing);
- static int _cl5CheckMissingCSN(const CSN *minCsn, const RUV *supplierRUV, CL5DBFile *file);
- /* changelog trimming */
- static int _cl5TrimInit(void);
- static void _cl5TrimCleanup(void);
- static int _cl5TrimMain(void *param);
- static void _cl5DoTrimming(void);
- static void _cl5CompactDBs(void);
- static void _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid);
- static int _cl5PurgeGetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);
- static int _cl5PurgeGetNextEntry(CL5Entry *entry, void *iterator, DBT *key);
- static void _cl5TrimFile(Object *obj, long *numToTrim);
- static PRBool _cl5CanTrim(time_t time, long *numToTrim);
- static int _cl5ReadRUV(const char *replGen, Object *obj, PRBool purge);
- static int _cl5WriteRUV(CL5DBFile *file, PRBool purge);
- static int _cl5ConstructRUV(const char *replGen, Object *obj, PRBool purge);
- static int _cl5UpdateRUV(Object *obj, CSN *csn, PRBool newReplica, PRBool purge);
- static int _cl5GetRUV2Purge2(Object *fileObj, RUV **ruv);
- void trigger_cl_purging_thread(void *rid);
- /* bakup/recovery, import/export */
- static int _cl5LDIF2Operation(char *ldifEntry, slapi_operation_parameters *op, char **replGen);
- static int _cl5Operation2LDIF(const slapi_operation_parameters *op, const char *replGen, char **ldifEntry, PRInt32 *lenLDIF);
- /* entry count */
- static int _cl5GetEntryCount(CL5DBFile *file);
- static int _cl5WriteEntryCount(CL5DBFile *file);
- /* misc */
- static char *_cl5GetHelperEntryKey(int type, char *csnStr);
- static Object *_cl5GetReplica(const slapi_operation_parameters *op, const char *replGen);
- static int _cl5FileEndsWith(const char *filename, const char *ext);
- static PRLock *cl5_diskfull_lock = NULL;
- static int cl5_diskfull_flag = 0;
- static void cl5_set_diskfull(void);
- static void cl5_set_no_diskfull(void);
- /***** Module APIs *****/
- /* Name: cl5Init
- Description: initializes changelog module; must be called by a single thread
- before any other changelog function.
- Parameters: none
- Return: CL5_SUCCESS if function is successful;
- CL5_SYSTEM_ERROR error if NSPR call fails.
- */
- int
- cl5Init(void)
- {
- s_cl5Desc.stLock = slapi_new_rwlock();
- if (s_cl5Desc.stLock == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5Init - Failed to create state lock; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- if ((s_cl5Desc.clLock = PR_NewLock()) == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5Init - Failed to create on close lock; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- if ((s_cl5Desc.clCvar = PR_NewCondVar(s_cl5Desc.clLock)) == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5Init - Failed to create on close cvar; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- if ((clcache_init(&s_cl5Desc.dbEnv) != 0)) {
- return CL5_SYSTEM_ERROR;
- }
- s_cl5Desc.dbState = CL5_STATE_CLOSED;
- s_cl5Desc.fatalError = PR_FALSE;
- s_cl5Desc.dbRmOnClose = PR_FALSE;
- s_cl5Desc.threadCount = 0;
- if (NULL == cl5_diskfull_lock) {
- cl5_diskfull_lock = PR_NewLock();
- }
- return CL5_SUCCESS;
- }
- /* Name: cl5Cleanup
- Description: performs cleanup of the changelog module; must be called by a single
- thread; it closes changelog if it is still open.
- Parameters: none
- Return: none
- */
- void
- cl5Cleanup()
- {
- /* close db if it is still open */
- if (s_cl5Desc.dbState == CL5_STATE_OPEN) {
- cl5Close();
- }
- if (s_cl5Desc.stLock)
- slapi_destroy_rwlock(s_cl5Desc.stLock);
- s_cl5Desc.stLock = NULL;
- if (cl5_diskfull_lock) {
- PR_DestroyLock(cl5_diskfull_lock);
- cl5_diskfull_lock = NULL;
- }
- if (s_cl5Desc.clLock != NULL) {
- PR_DestroyLock(s_cl5Desc.clLock);
- s_cl5Desc.clLock = NULL;
- }
- if (s_cl5Desc.clCvar != NULL) {
- PR_DestroyCondVar(s_cl5Desc.clCvar);
- s_cl5Desc.clCvar = NULL;
- }
- memset(&s_cl5Desc, 0, sizeof(s_cl5Desc));
- }
- /* Name: cl5Open
- Description: opens changelog; must be called after changelog is
- initialized using cl5Init. It is thread safe and the second
- call is ignored.
- Parameters: dir - changelog dir
- config - db configuration parameters; currently not used
- Return: CL5_SUCCESS if successfull;
- CL5_BAD_DATA if invalid directory is passed;
- CL5_BAD_STATE if changelog is not initialized;
- CL5_BAD_DBVERSION if dbversion file is missing or has unexpected data
- CL5_SYSTEM_ERROR if NSPR error occured (during db directory creation);
- CL5_MEMORY_ERROR if memory allocation fails;
- CL5_DB_ERROR if db initialization fails.
- */
- int
- cl5Open(const char *dir, const CL5DBConfig *config)
- {
- int rc;
- if (dir == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "cl5Open: null directory\n");
- return CL5_BAD_DATA;
- }
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5Open - Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- /* prevent state from changing */
- slapi_rwlock_wrlock(s_cl5Desc.stLock);
- /* already open - ignore */
- if (s_cl5Desc.dbState == CL5_STATE_OPEN) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5Open - changelog already opened; request ignored\n");
- rc = CL5_SUCCESS;
- goto done;
- } else if (s_cl5Desc.dbState != CL5_STATE_CLOSED) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5Open - Invalid state - %d\n", s_cl5Desc.dbState);
- rc = CL5_BAD_STATE;
- goto done;
- }
- rc = _cl5Open(dir, config, CL5_OPEN_NORMAL);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5Open - Failed to open changelog\n");
- goto done;
- }
- /* dispatch global threads like deadlock detection, trimming, etc */
- rc = _cl5DispatchDBThreads();
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5Open - Failed to start database monitoring threads\n");
- _cl5Close();
- } else {
- s_cl5Desc.dbState = CL5_STATE_OPEN;
- clcache_set_config();
- /* Set the cl encryption algorithm (if configured) */
- rc = clcrypt_init(config, &s_cl5Desc.clcrypt_handle);
- }
- done:
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- return rc;
- }
- /* Name: cl5Close
- Description: closes changelog; waits until all threads are done using changelog;
- call is ignored if changelog is already closed.
- Parameters: none
- Return: CL5_SUCCESS if successful;
- CL5_BAD_STATE if db is not in the open or closed state;
- CL5_SYSTEM_ERROR if NSPR call fails;
- CL5_DB_ERROR if db shutdown fails
- */
- int
- cl5Close()
- {
- int rc = CL5_SUCCESS;
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5Close - Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- slapi_rwlock_wrlock(s_cl5Desc.stLock);
- /* already closed - ignore */
- if (s_cl5Desc.dbState == CL5_STATE_CLOSED) {
- slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
- "cl5Close - Changelog closed; request ignored\n");
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- return CL5_SUCCESS;
- } else if (s_cl5Desc.dbState != CL5_STATE_OPEN) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5Close - Invalid state - %d\n", s_cl5Desc.dbState);
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- return CL5_BAD_STATE;
- }
- /* signal changelog closing to all threads */
- s_cl5Desc.dbState = CL5_STATE_CLOSING;
- PR_Lock(s_cl5Desc.clLock);
- PR_NotifyCondVar(s_cl5Desc.clCvar);
- PR_Unlock(s_cl5Desc.clLock);
- _cl5Close();
- s_cl5Desc.dbState = CL5_STATE_CLOSED;
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- return rc;
- }
- /* Name: cl5Delete
- Description: removes changelog; changelog must be in the closed state.
- Parameters: dir - changelog directory
- Return: CL5_SUCCESS if successful;
- CL5_BAD_STATE if the changelog is not in closed state;
- CL5_BAD_DATA if invalid directory supplied
- CL5_SYSTEM_ERROR if NSPR call fails
- */
- int
- cl5Delete(const char *dir)
- {
- int rc;
- if (dir == NULL) {
- slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, "cl5Delete - NULL directory\n");
- return CL5_BAD_DATA;
- }
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5Delete - Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- slapi_rwlock_wrlock(s_cl5Desc.stLock);
- if (s_cl5Desc.dbState != CL5_STATE_CLOSED) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5Delete - Invalid state - %d\n", s_cl5Desc.dbState);
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- return CL5_BAD_STATE;
- }
- rc = _cl5Delete(dir, PR_TRUE /* remove changelog dir */);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5Delete - Failed to remove changelog\n");
- }
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- return rc;
- }
- /* Name: cl5DeleteDBSync
- Description: The same as cl5DeleteDB except the function does not return
- until the file is removed.
- */
- int
- cl5DeleteDBSync(Object *replica)
- {
- Object *obj;
- int rc;
- CL5DBFile *file;
- if (replica == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5DeleteDBSync - invalid database id\n");
- return CL5_BAD_DATA;
- }
- /* changelog is not initialized */
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDBSync - "
- "Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- /* make sure that changelog stays open while operation is in progress */
- rc = _cl5AddThread();
- if (rc != CL5_SUCCESS)
- return rc;
- rc = _cl5GetDBFile(replica, &obj);
- if (rc == CL5_SUCCESS) {
- char *filename = NULL;
- file = (CL5DBFile *)object_get_data(obj);
- PR_ASSERT(file);
- /* file->name is freed in _cl5DBDeleteFile */
- filename = slapi_ch_strdup(file->name);
- _cl5DBDeleteFile(obj);
- /* wait until the file is gone */
- while (PR_Access(filename, PR_ACCESS_EXISTS) == PR_SUCCESS) {
- DS_Sleep(PR_MillisecondsToInterval(100));
- }
- slapi_ch_free_string(&filename);
- } else {
- Replica *r = (Replica *)object_get_data(replica);
- PR_ASSERT(r);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDBSync - "
- "File for replica at (%s) not found\n",
- slapi_sdn_get_dn(replica_get_root(r)));
- }
- _cl5RemoveThread();
- return rc;
- }
- /* Name: cl5GetUpperBoundRUV
- Description: retrieves vector for that represents the upper bound of the changes for a replica.
- Parameters: r - replica for which the purge vector is requested
- ruv - contains a copy of the purge ruv if function is successful;
- unchanged otherwise. It is responsibility of the caller to free
- the ruv when it is no longer is in use
- Return: CL5_SUCCESS if function is successful
- CL5_BAD_STATE if the changelog is not initialized;
- CL5_BAD_DATA - if NULL id is supplied
- CL5_NOTFOUND, if changelog file for replica is not found
- */
- int
- cl5GetUpperBoundRUV(Replica *r, RUV **ruv)
- {
- int rc;
- Object *r_obj, *file_obj;
- CL5DBFile *file;
- if (r == NULL || ruv == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5GetUpperBoundRUV - Invalid parameters\n");
- return CL5_BAD_DATA;
- }
- /* changelog is not initialized */
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetUpperBoundRUV - "
- "Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- /* make sure that changelog stays open while operation is in progress */
- rc = _cl5AddThread();
- if (rc != CL5_SUCCESS)
- return rc;
- /* create a temporary replica object because of the interface we have */
- r_obj = object_new(r, NULL);
- rc = _cl5GetDBFile(r_obj, &file_obj);
- if (rc == CL5_SUCCESS) {
- file = (CL5DBFile *)object_get_data(file_obj);
- PR_ASSERT(file && file->maxRUV);
- *ruv = ruv_dup(file->maxRUV);
- object_release(file_obj);
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetUpperBoundRUV - "
- "Could not find DB object for replica\n");
- }
- object_release(r_obj);
- _cl5RemoveThread();
- return rc;
- }
- /* Name: cl5ExportLDIF
- Description: dumps changelog to an LDIF file; changelog can be open or closed.
- Parameters: clDir - changelog dir
- ldifFile - full path to ldif file to write
- replicas - optional list of replicas whose changes should be exported;
- if the list is NULL, entire changelog is exported.
- Return: CL5_SUCCESS if function is successfull;
- CL5_BAD_DATA if invalid parameter is passed;
- CL5_BAD_STATE if changelog is not initialized;
- CL5_DB_ERROR if db api fails;
- CL5_SYSTEM_ERROR if NSPR call fails;
- CL5_MEMORY_ERROR if memory allocation fials.
- */
- int
- cl5ExportLDIF(const char *ldifFile, Object **replicas)
- {
- int i;
- int rc;
- PRFileDesc *prFile = NULL;
- Object *obj;
- if (ldifFile == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ExportLDIF - null ldif file name\n");
- return CL5_BAD_DATA;
- }
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ExportLDIF - Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- /* make sure that changelog is open while operation is in progress */
- rc = _cl5AddThread();
- if (rc != CL5_SUCCESS)
- return rc;
- prFile = PR_Open(ldifFile, PR_WRONLY | PR_CREATE_FILE | PR_TRUNCATE, 0600);
- if (prFile == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ExportLDIF - Failed to open (%s) file; NSPR error - %d\n",
- ldifFile, PR_GetError());
- rc = CL5_SYSTEM_ERROR;
- goto done;
- }
- slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
- "cl5ExportLDIF: starting changelog export to (%s) ...\n", ldifFile);
- if (replicas) /* export only selected files */
- {
- for (i = 0; replicas[i]; i++) {
- rc = _cl5GetDBFile(replicas[i], &obj);
- if (rc == CL5_SUCCESS) {
- rc = _cl5ExportFile(prFile, obj);
- object_release(obj);
- } else {
- Replica *r = (Replica *)object_get_data(replicas[i]);
- PR_ASSERT(r);
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "cl5ExportLDIF - "
- "Failed to locate changelog file for replica at (%s)\n",
- slapi_sdn_get_dn(replica_get_root(r)));
- }
- }
- } else /* export all files */
- {
- for (obj = objset_first_obj(s_cl5Desc.dbFiles); obj;
- obj = objset_next_obj(s_cl5Desc.dbFiles, obj)) {
- rc = _cl5ExportFile(prFile, obj);
- }
- }
- rc = CL5_SUCCESS;
- done:;
- _cl5RemoveThread();
- if (rc == CL5_SUCCESS)
- slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
- "cl5ExportLDIF - Changelog export is finished.\n");
- if (prFile)
- PR_Close(prFile);
- return rc;
- }
- /* Name: cl5ImportLDIF
- Description: imports ldif file into changelog; changelog must be in the closed state
- Parameters: clDir - changelog dir
- ldifFile - absolute path to the ldif file to import
- replicas - list of replicas whose data should be imported.
- Return: CL5_SUCCESS if function is successfull;
- CL5_BAD_DATA if invalid parameter is passed;
- CL5_BAD_STATE if changelog is open or not inititalized;
- CL5_DB_ERROR if db api fails;
- CL5_SYSTEM_ERROR if NSPR call fails;
- CL5_MEMORY_ERROR if memory allocation fials.
- */
- int
- cl5ImportLDIF(const char *clDir, const char *ldifFile, Object **replicas)
- {
- #if defined(USE_OPENLDAP)
- LDIFFP *file = NULL;
- int buflen;
- ldif_record_lineno_t lineno = 0;
- #else
- FILE *file = NULL;
- int lineno = 0;
- #endif
- int rc;
- char *buff = NULL;
- slapi_operation_parameters op;
- Object *prim_replica_obj = NULL;
- Object *replica_obj = NULL;
- Object *file_obj = NULL;
- Replica *prim_replica = NULL;
- char *replGen = NULL;
- CL5DBFile *dbfile = NULL;
- struct berval **purgevals = NULL;
- struct berval **maxvals = NULL;
- int purgeidx = 0;
- int maxidx = 0;
- int maxpurgesz = 0;
- int maxmaxsz = 0;
- int entryCount = 0;
- /* validate params */
- if (ldifFile == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ImportLDIF - null ldif file name\n");
- return CL5_BAD_DATA;
- }
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ImportLDIF - Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- if (replicas == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ImportLDIF - null list of replicas\n");
- return CL5_BAD_DATA;
- }
- prim_replica_obj = replicas[0];
- if (NULL == prim_replica_obj) {
- /* Never happens for now. (see replica_execute_ldif2cl_task) */
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ImportLDIF - empty replica list\n");
- return CL5_BAD_DATA;
- }
- prim_replica = (Replica *)object_get_data(prim_replica_obj);
- /* make sure that nobody change changelog state while import is in progress */
- slapi_rwlock_wrlock(s_cl5Desc.stLock);
- /* make sure changelog is closed */
- if (s_cl5Desc.dbState != CL5_STATE_CLOSED) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ImportLDIF - Invalid state - %d \n", s_cl5Desc.dbState);
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- return CL5_BAD_STATE;
- }
- /* open LDIF file */
- #if defined(USE_OPENLDAP)
- file = ldif_open(ldifFile, "r");
- #else
- file = fopen(ldifFile, "r"); /* XXXggood Does fopen reliably work if > 255 files open? */
- #endif
- if (file == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ImportLDIF - Failed to open (%s) ldif file; system error - %d\n",
- ldifFile, errno);
- rc = CL5_SYSTEM_ERROR;
- goto done;
- }
- /* remove changelog */
- rc = _cl5Delete(clDir, PR_FALSE);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ImportLDIF - Failed to remove changelog\n");
- goto done;
- }
- /* open changelog */
- rc = _cl5Open(clDir, NULL, CL5_OPEN_LDIF2CL);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ImportLDIF - Failed to open changelog\n");
- goto done;
- }
- s_cl5Desc.dbState = CL5_STATE_OPEN; /* force to change the state */
- /* read entries and write them to changelog */
- #if defined(USE_OPENLDAP)
- while (ldif_read_record(file, &lineno, &buff, &buflen))
- #else
- while ((buff = ldif_get_entry(file, &lineno)) != NULL)
- #endif
- {
- rc = _cl5LDIF2Operation(buff, &op, &replGen);
- if (rc != CL5_SUCCESS) {
- /*
- * clpurgeruv: {replicageneration} 4d13a124000000010000
- * clpurgeruv: {replica 2 ldap://host:port}
- * clpurgeruv: {replica 1 ldap://host:port}
- * clmaxruv: {replicageneration} 4d13a124000000010000
- * clmaxruv: {replica 2} <mincsn> <maxcsn> <timestamp>
- * clmaxruv: {replica 1} <mincsn> <maxcsn> <timestamp>
- */
- char *line;
- char *next = buff;
- struct berval type, value;
- int freeval = 0;
- while ((line = ldif_getline(&next)) != NULL) {
- rc = slapi_ldif_parse_line(line, &type, &value, &freeval);
- /* ruv_dump (dbfile->purgeRUV, "clpurgeruv", prFile); */
- if (0 == strcasecmp(type.bv_val, "clpurgeruv")) {
- if (maxpurgesz < purgeidx + 2) {
- if (!maxpurgesz) {
- maxpurgesz = 4 * (purgeidx + 2);
- } else {
- maxpurgesz *= 2;
- }
- purgevals = (struct berval **)slapi_ch_realloc(
- (char *)purgevals,
- sizeof(struct berval *) * maxpurgesz);
- }
- purgevals[purgeidx++] = slapi_ch_bvdup(&value);
- purgevals[purgeidx] = NULL; /* make sure NULL terminated */
- }
- /* ruv_dump (dbfile->maxRUV, "clmaxruv", prFile); */
- else if (0 == strcasecmp(type.bv_val, "clmaxruv")) {
- if (maxmaxsz < maxidx + 2) {
- if (!maxmaxsz) {
- maxmaxsz = 4 * (maxidx + 2);
- } else {
- maxmaxsz *= 2;
- }
- maxvals = (struct berval **)slapi_ch_realloc(
- (char *)maxvals,
- sizeof(struct berval *) * maxmaxsz);
- }
- /* {replica #} min_csn csn [last_modified] */
- /* get rid of last_modified, if any */
- maxvals[maxidx++] = slapi_ch_bvdup(&value);
- maxvals[maxidx] = NULL; /* make sure NULL terminated */
- }
- if (freeval) {
- slapi_ch_free_string(&value.bv_val);
- }
- }
- slapi_ch_free_string(&buff);
- #if defined(USE_OPENLDAP)
- buflen = 0;
- #endif
- goto next;
- }
- slapi_ch_free_string(&buff);
- #if defined(USE_OPENLDAP)
- buflen = 0;
- #endif
- /* if we perform selective import, check if the operation should be wriiten to changelog */
- replica_obj = _cl5GetReplica(&op, replGen);
- if (replica_obj == NULL) {
- /*
- * changetype: delete
- * replgen: 4d13a124000000010000
- * csn: 4d23b909000000020000
- * nsuniqueid: 00000000-00000000-00000000-00000000
- * dn: cn=start iteration
- */
- rc = _cl5WriteOperation(replica_get_name(prim_replica),
- replGen, &op, 1);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ImportLDIF - "
- "Failed to write operation to the changelog: "
- "type: %lu, dn: %s\n",
- op.operation_type, REPL_GET_DN(&op.target_address));
- slapi_ch_free_string(&replGen);
- operation_parameters_done(&op);
- goto done;
- }
- entryCount++;
- goto next;
- }
- if (!replicas || _cl5ReplicaInList(replica_obj, replicas)) {
- /* write operation creates the file if it does not exist */
- rc = _cl5WriteOperation(replica_get_name((Replica *)object_get_data(replica_obj)),
- replGen, &op, 1);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ImportLDIF - "
- "Failed to write operation to the changelog: "
- "type: %lu, dn: %s\n",
- op.operation_type, REPL_GET_DN(&op.target_address));
- object_release(replica_obj);
- slapi_ch_free_string(&replGen);
- operation_parameters_done(&op);
- goto done;
- }
- entryCount++;
- }
- next:
- if (replica_obj) {
- object_release(replica_obj);
- }
- slapi_ch_free_string(&replGen);
- operation_parameters_done(&op);
- }
- /* Set RUVs and entry count */
- file_obj = objset_first_obj(s_cl5Desc.dbFiles);
- while (file_obj) {
- dbfile = (CL5DBFile *)object_get_data(file_obj);
- if (0 == strcasecmp(dbfile->replName, replica_get_name(prim_replica))) {
- break;
- }
- dbfile = NULL;
- file_obj = objset_next_obj(s_cl5Desc.dbFiles, file_obj);
- }
- if (dbfile) {
- if (purgeidx > 0) {
- ruv_destroy(&dbfile->purgeRUV);
- rc = ruv_init_from_bervals(purgevals, &dbfile->purgeRUV);
- }
- if (maxidx > 0) {
- ruv_destroy(&dbfile->maxRUV);
- rc = ruv_init_from_bervals(maxvals, &dbfile->maxRUV);
- }
- dbfile->entryCount = entryCount;
- }
- if (file_obj) {
- object_release(file_obj);
- }
- done:
- for (purgeidx = 0; purgevals && purgevals[purgeidx]; purgeidx++) {
- slapi_ch_bvfree(&purgevals[purgeidx]);
- }
- slapi_ch_free((void **)&purgevals);
- for (maxidx = 0; maxvals && maxvals[maxidx]; maxidx++) {
- slapi_ch_bvfree(&maxvals[maxidx]);
- }
- slapi_ch_free((void **)&maxvals);
- if (file) {
- #if defined(USE_OPENLDAP)
- ldif_close(file);
- #else
- fclose(file);
- #endif
- }
- if (CL5_STATE_OPEN == s_cl5Desc.dbState) {
- _cl5Close();
- s_cl5Desc.dbState = CL5_STATE_CLOSED; /* force to change the state */
- }
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- return rc;
- }
- /* Name: cl5GetState
- Description: returns database state
- Parameters: none
- Return: changelog state
- */
- int
- cl5GetState()
- {
- return s_cl5Desc.dbState;
- }
- /* Name: cl5ConfigTrimming
- Description: sets changelog trimming parameters; changelog must be open.
- Parameters: maxEntries - maximum number of entries in the chnagelog (in all files);
- maxAge - maximum entry age;
- compactInterval - interval to compact changelog db;
- trimInterval - changelog trimming interval.
- Return: CL5_SUCCESS if successful;
- CL5_BAD_STATE if changelog is not open
- */
- int
- cl5ConfigTrimming(int maxEntries, const char *maxAge, int compactInterval, int trimInterval)
- {
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ConfigTrimming - Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- /* make sure changelog is not closed while trimming configuration
- is updated.*/
- if (CL5_SUCCESS != _cl5AddThread()) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5ConfigTrimming - Could not start changelog trimming thread\n");
- return CL5_BAD_STATE;
- }
- PR_Lock(s_cl5Desc.dbTrim.lock);
- if (maxAge) {
- /* don't ignore this argument */
- if (strcmp(maxAge, CL5_STR_IGNORE) != 0) {
- s_cl5Desc.dbTrim.maxAge = slapi_parse_duration(maxAge);
- }
- } else {
- /* unlimited */
- s_cl5Desc.dbTrim.maxAge = 0;
- }
- if (maxEntries != CL5_NUM_IGNORE) {
- s_cl5Desc.dbTrim.maxEntries = maxEntries;
- }
- if (compactInterval != CL5_NUM_IGNORE) {
- s_cl5Desc.dbTrim.compactInterval = compactInterval;
- }
- if (trimInterval != CL5_NUM_IGNORE) {
- s_cl5Desc.dbTrim.trimInterval = trimInterval;
- }
- /* The config was updated, notify the changelog trimming thread */
- PR_Lock(s_cl5Desc.clLock);
- PR_NotifyCondVar(s_cl5Desc.clCvar);
- PR_Unlock(s_cl5Desc.clLock);
- PR_Unlock(s_cl5Desc.dbTrim.lock);
- _cl5RemoveThread();
- return CL5_SUCCESS;
- }
- /* Name: cl5GetOperation
- Description: retireves operation specified by its csn and databaseid
- Parameters: op - must contain csn and databaseid; the rest of data is
- filled if function is successfull
- Return: CL5_SUCCESS if function is successfull;
- CL5_BAD_DATA if invalid op is passed;
- CL5_BAD_STATE if db has not been initialized;
- CL5_NOTFOUND if entry was not found;
- CL5_DB_ERROR if any other db error occured;
- CL5_BADFORMAT if db data format does not match entry format.
- */
- int
- cl5GetOperation(Object *replica, slapi_operation_parameters *op)
- {
- int rc;
- char *agmt_name;
- agmt_name = get_thread_private_agmtname();
- if (replica == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetOperation - NULL replica\n");
- return CL5_BAD_DATA;
- }
- if (op == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetOperation - NULL operation\n");
- return CL5_BAD_DATA;
- }
- if (op->csn == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "%s: cl5GetOperation - operation contains no CSN\n", agmt_name);
- return CL5_BAD_DATA;
- }
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5GetOperation - %s - Changelog is not initialized\n", agmt_name);
- return CL5_BAD_STATE;
- }
- /* make sure that changelog is open while operation is in progress */
- rc = _cl5AddThread();
- if (rc != CL5_SUCCESS)
- return rc;
- rc = _cl5GetOperation(replica, op);
- _cl5RemoveThread();
- return rc;
- }
- /* Name: cl5GetFirstOperation
- Description: retrieves first operation for a particular database
- replica - replica for which the operation should be retrieved.
- Parameters: op - buffer to store the operation;
- iterator - to be passed to the call to cl5GetNextOperation
- Return: CL5_SUCCESS, if successful
- CL5_BADDATA, if operation is NULL
- CL5_BAD_STATE, if changelog is not open
- CL5_DB_ERROR, if db call fails
- */
- int
- cl5GetFirstOperation(Object *replica, slapi_operation_parameters *op, void **iterator)
- {
- int rc;
- CL5Entry entry;
- Object *obj;
- char *agmt_name;
- if (replica == NULL || op == NULL || iterator == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5GetFirstOperation - Invalid argument\n");
- return CL5_BAD_DATA;
- }
- *iterator = NULL;
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- agmt_name = get_thread_private_agmtname();
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5GetFirstOperation - %s - Changelog is not initialized\n", agmt_name);
- return CL5_BAD_STATE;
- }
- /* make sure that changelog stays open while operation is in progress */
- rc = _cl5AddThread();
- if (rc != CL5_SUCCESS)
- return rc;
- rc = _cl5GetDBFile(replica, &obj);
- if (rc != CL5_SUCCESS) {
- _cl5RemoveThread();
- return rc;
- }
- entry.op = op;
- /* Callers of this function should cl5_operation_parameters_done(op) */
- rc = _cl5GetFirstEntry(obj, &entry, iterator, NULL);
- object_release(obj);
- _cl5RemoveThread();
- return rc;
- }
- /* Name: cl5GetNextOperation
- Description: retrieves the next op from the changelog as defined by the iterator;
- changelog must be open.
- Parameters: op - returned operation, if function is successful
- iterator - in: identifies op to retrieve; out: identifies next op
- Return: CL5_SUCCESS, if successful
- CL5_BADDATA, if op is NULL
- CL5_BAD_STATE, if changelog is not open
- CL5_NOTFOUND, empty changelog
- CL5_DB_ERROR, if db call fails
- */
- int
- cl5GetNextOperation(slapi_operation_parameters *op, void *iterator)
- {
- CL5Entry entry;
- if (op == NULL || iterator == NULL || !_cl5IsValidIterator(iterator)) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5GetNextOperation - Invalid argument\n");
- return CL5_BAD_DATA;
- }
- if (s_cl5Desc.dbState != CL5_STATE_OPEN) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5GetNextOperation - Changelog is not open\n");
- return CL5_BAD_STATE;
- }
- /* we don't need to increment thread count since cl5GetFirstOperation
- locked the file through which we are iterating */
- entry.op = op;
- /* Callers of this function should cl5_operation_parameters_done(op) */
- return _cl5GetNextEntry(&entry, iterator);
- }
- /* Name: cl5DestroyIterator
- Description: destroys iterator once iteration through changelog is done
- Parameters: iterator - iterator to destroy
- Return: none
- */
- void
- cl5DestroyIterator(void *iterator)
- {
- CL5Iterator *it = (CL5Iterator *)iterator;
- if (it == NULL)
- return;
- /* close cursor */
- if (it->cursor)
- it->cursor->c_close(it->cursor);
- if (it->file)
- object_release(it->file);
- slapi_ch_free((void **)&it);
- }
- /* Name: cl5WriteOperationTxn
- Description: writes operation to changelog
- Parameters: replName - name of the replica to which operation applies
- replGen - replica generation for the operation
- !!!Note that we pass name and generation rather than
- replica object since generation can change while operation
- is in progress (if the data is reloaded). !!!
- op - operation to write
- local - this is a non-replicated operation
- txn - the transaction containing this operation
- Return: CL5_SUCCESS if function is successfull;
- CL5_BAD_DATA if invalid op is passed;
- CL5_BAD_STATE if db has not been initialized;
- CL5_MEMORY_ERROR if memory allocation failed;
- CL5_DB_ERROR if any other db error occured;
- */
- int
- cl5WriteOperationTxn(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local, void *txn)
- {
- int rc;
- if (op == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5WriteOperationTxn - NULL operation passed\n");
- return CL5_BAD_DATA;
- }
- if (!IsValidOperation(op)) {
- return CL5_BAD_DATA;
- }
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5WriteOperationTxn - Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- /* make sure that changelog is open while operation is in progress */
- rc = _cl5AddThread();
- if (rc != CL5_SUCCESS)
- return rc;
- rc = _cl5WriteOperationTxn(replName, replGen, op, local, txn);
- /* update the upper bound ruv vector */
- if (rc == CL5_SUCCESS) {
- Object *file_obj = NULL;
- if (_cl5GetDBFileByReplicaName(replName, replGen, &file_obj) == CL5_SUCCESS) {
- rc = _cl5UpdateRUV(file_obj, op->csn, PR_FALSE, PR_FALSE);
- object_release(file_obj);
- }
- }
- _cl5RemoveThread();
- return rc;
- }
- /* Name: cl5WriteOperation
- Description: writes operation to changelog
- Parameters: replName - name of the replica to which operation applies
- replGen - replica generation for the operation
- !!!Note that we pass name and generation rather than
- replica object since generation can change while operation
- is in progress (if the data is reloaded). !!!
- op - operation to write
- local - this is a non-replicated operation
- Return: CL5_SUCCESS if function is successfull;
- CL5_BAD_DATA if invalid op is passed;
- CL5_BAD_STATE if db has not been initialized;
- CL5_MEMORY_ERROR if memory allocation failed;
- CL5_DB_ERROR if any other db error occured;
- */
- int
- cl5WriteOperation(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local)
- {
- return cl5WriteOperationTxn(replName, replGen, op, local, NULL);
- }
- /* Name: cl5CreateReplayIterator
- Description: creates an iterator that allows to retrieve changes that should
- to be sent to the consumer identified by ruv. The iteration is performed by
- repeated calls to cl5GetNextOperationToReplay.
- Parameters: replica - replica whose data we wish to iterate;
- ruv - consumer ruv;
- iterator - iterator to be passed to cl5GetNextOperationToReplay call
- Return: CL5_SUCCESS, if function is successful;
- CL5_MISSING_DATA, if data that should be in the changelog is missing
- CL5_PURGED_DATA, if some data that consumer needs has been purged.
- Note that the iterator can be non null if the supplier contains
- some data that needs to be sent to the consumer
- CL5_NOTFOUND if the consumer is up to data with respect to the supplier
- CL5_BAD_DATA if invalid parameter is passed;
- CL5_BAD_STATE if db has not been open;
- CL5_DB_ERROR if any other db error occurred;
- CL5_MEMORY_ERROR if memory allocation fails.
- Algorithm: Build a list of csns from consumer's and supplier's ruv. For each element
- of the consumer's ruv put max csn into the csn list. For each element
- of the supplier's ruv not in the consumer's ruv put min csn from the
- supplier's ruv into the list. The list contains, for each known replica,
- the starting point for changes to be sent to the consumer.
- Sort the list in ascending order.
- Build a hash which contains, for each known replica, whether the
- supplier can bring the consumer up to data with respect to that replica.
- The hash is used to decide whether a change can be sent to the consumer
- Find the replica with the smallest csn in the list for which
- we can bring the consumer up to date.
- Position the db cursor on the change entry that corresponds to this csn.
- Hash entries are created for each replica traversed so far. sendChanges
- flag is set to FALSE for all replicas except the last traversed.
- */
- int
- cl5CreateReplayIteratorEx(Private_Repl_Protocol *prp, const RUV *consumerRuv, CL5ReplayIterator **iterator, ReplicaId consumerRID)
- {
- int rc;
- Object *replica;
- Object *obj = NULL;
- replica = prp->replica_object;
- if (replica == NULL || consumerRuv == NULL || iterator == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5CreateReplayIteratorEx - Invalid parameter\n");
- return CL5_BAD_DATA;
- }
- *iterator = NULL;
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5CreateReplayIteratorEx - Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- /* make sure that changelog is open while operation is in progress */
- rc = _cl5AddThread();
- if (rc != CL5_SUCCESS)
- return rc;
- rc = _cl5GetDBFile(replica, &obj);
- if (rc == CL5_SUCCESS && obj) {
- /* iterate through the ruv in csn order to find first master for which
- we can replay changes */
- rc = _cl5PositionCursorForReplay(consumerRID, consumerRuv, replica, obj, iterator, NULL);
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5CreateReplayIteratorEx - Could not find DB object for replica\n");
- }
- if (rc != CL5_SUCCESS) {
- if (obj)
- object_release(obj);
- /* release the thread */
- _cl5RemoveThread();
- }
- return rc;
- }
- /* cl5CreateReplayIterator is now a wrapper for cl5CreateReplayIteratorEx */
- int
- cl5CreateReplayIterator(Private_Repl_Protocol *prp, const RUV *consumerRuv, CL5ReplayIterator **iterator)
- {
- /* DBDB : I thought it should be possible to refactor this like so, but it seems to not work.
- Possibly the ordering of the calls is significant.
- ReplicaId consumerRID = agmt_get_consumer_rid ( prp->agmt, prp->conn );
- return cl5CreateReplayIteratorEx(prp,consumerRuv,iterator,consumerRID); */
- int rc;
- Object *replica;
- Object *obj = NULL;
- replica = prp->replica_object;
- if (replica == NULL || consumerRuv == NULL || iterator == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5CreateReplayIterator - Invalid parameter\n");
- return CL5_BAD_DATA;
- }
- *iterator = NULL;
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5CreateReplayIterator - Changelog is not initialized\n");
- return CL5_BAD_STATE;
- }
- /* make sure that changelog is open while operation is in progress */
- rc = _cl5AddThread();
- if (rc != CL5_SUCCESS)
- return rc;
- rc = _cl5GetDBFile(replica, &obj);
- if (rc == CL5_SUCCESS && obj) {
- /* iterate through the ruv in csn order to find first master for which
- we can replay changes */
- ReplicaId consumerRID = agmt_get_consumer_rid(prp->agmt, prp->conn);
- int continue_on_missing = agmt_get_ignoremissing(prp->agmt);
- int save_cont_miss = continue_on_missing;
- rc = _cl5PositionCursorForReplay(consumerRID, consumerRuv, replica, obj, iterator, &continue_on_missing);
- if (save_cont_miss == 1 && continue_on_missing == 0) {
- /* the option to continue once on a missing csn was used, rest */
- agmt_set_ignoremissing(prp->agmt, 0);
- }
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5CreateReplayIterator - Could not find DB object for replica\n");
- }
- if (rc != CL5_SUCCESS) {
- if (obj)
- object_release(obj);
- /* release the thread */
- _cl5RemoveThread();
- }
- return rc;
- }
- /* Name: cl5GetNextOperationToReplay
- Description: retrieves next operation to be sent to a particular consumer and
- that was created on a particular master. Consumer and master info
- is encoded in the iterator parameter that must be created by call
- to cl5CreateReplayIterator.
- Parameters: iterator - iterator that identifies next entry to retrieve;
- op - operation retrieved if function is successful
- Return: CL5_SUCCESS if function is successfull;
- CL5_BAD_DATA if invalid parameter is passed;
- CL5_NOTFOUND if end of iteration list is reached
- CL5_DB_ERROR if any other db error occured;
- CL5_BADFORMAT if data in db is of unrecognized format;
- CL5_MEMORY_ERROR if memory allocation fails.
- Algorithm: Iterate through changelog entries until a change is found that
- originated at the replica for which we are sending changes
- (based on the information in the iteration hash) and
- whose csn is larger than the csn already seen by the consumer
- If change originated at the replica not in the hash,
- determine whether we should send changes originated at the replica
- and add replica entry into the hash. We can send the changes for
- the replica if the current csn is smaller or equal to the csn
- in the consumer's ruv (if present) or if it is equal to the min
- csn in the supplier's ruv.
- */
- int
- cl5GetNextOperationToReplay(CL5ReplayIterator *iterator, CL5Entry *entry)
- {
- CSN *csn;
- char *key, *data;
- size_t keylen, datalen;
- char *agmt_name;
- int rc = 0;
- agmt_name = get_thread_private_agmtname();
- if (entry == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5GetNextOperationToReplay - %s - Invalid parameter passed\n", agmt_name);
- return CL5_BAD_DATA;
- }
- rc = clcache_get_next_change(iterator->clcache, (void **)&key, &keylen, (void **)&data, &datalen, &csn);
- if (rc == DB_NOTFOUND) {
- /*
- * Abort means we've figured out that we've passed the replica Min CSN,
- * so we should stop looping through the changelog
- */
- return CL5_NOTFOUND;
- }
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "cl5GetNextOperationToReplay - %s - "
- "Failed to read next entry; DB error %d\n",
- agmt_name, rc);
- return CL5_DB_ERROR;
- }
- if (is_cleaned_rid(csn_get_replicaid(csn))) {
- /*
- * This operation is from a deleted replica. During the cleanallruv task the
- * replicas are cleaned first before this instance is. This can cause the
- * server to basically do a full update over and over. So we have to watch for
- * this, and not send these operations out.
- */
- return CL5_IGNORE_OP;
- }
- /* there is an entry we should return */
- /* Callers of this function should cl5_operation_parameters_done(op) */
- if (0 != cl5DBData2Entry(data, datalen, entry)) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5GetNextOperationToReplay - %s - Failed to format entry rc=%d\n", agmt_name, rc);
- return rc;
- }
- return CL5_SUCCESS;
- }
- /* Name: cl5DestroyReplayIterator
- Description: destorys iterator
- Parameters: iterator - iterator to destory
- Return: none
- */
- void
- cl5DestroyReplayIterator(CL5ReplayIterator **iterator)
- {
- if (iterator == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5DestroyReplayIterator - Invalid iterator passed\n");
- return;
- }
- clcache_return_buffer(&(*iterator)->clcache);
- if ((*iterator)->fileObj) {
- object_release((*iterator)->fileObj);
- (*iterator)->fileObj = NULL;
- }
- /* release supplier's ruv */
- if ((*iterator)->supplierRuvObj) {
- object_release((*iterator)->supplierRuvObj);
- (*iterator)->supplierRuvObj = NULL;
- }
- slapi_ch_free((void **)iterator);
- /* this thread no longer holds a db reference, release it */
- _cl5RemoveThread();
- }
- /* Name: cl5DeleteOnClose
- Description: marks changelog for deletion when it is closed
- Parameters: flag; if flag = 1 then delete else don't
- Return: none
- */
- void
- cl5DeleteOnClose(PRBool rm)
- {
- s_cl5Desc.dbRmOnClose = rm;
- }
- /* Name: cl5GetDir
- Description: returns changelog directory
- Parameters: none
- Return: copy of the directory; caller needs to free the string
- */
- char *
- cl5GetDir()
- {
- if (s_cl5Desc.dbDir == NULL) {
- return NULL;
- } else {
- return slapi_ch_strdup(s_cl5Desc.dbDir);
- }
- }
- /* Name: cl5Exist
- Description: checks if a changelog exists in the specified directory;
- We consider changelog to exist if it contains the dbversion file.
- Parameters: clDir - directory to check
- Return: 1 - if changelog exists; 0 - otherwise
- */
- PRBool
- cl5Exist(const char *clDir)
- {
- char fName[MAXPATHLEN + 1];
- int rc;
- PR_snprintf(fName, MAXPATHLEN, "%s/%s", clDir, VERSION_FILE);
- rc = PR_Access(fName, PR_ACCESS_EXISTS);
- return (rc == PR_SUCCESS);
- }
- /* Name: cl5GetOperationCount
- Description: returns number of entries in the changelog. The changelog must be
- open for the value to be meaningful.
- Parameters: replica - optional parameter that specifies the replica whose operations
- we wish to count; if NULL all changelog entries are counted
- Return: number of entries in the changelog
- */
- int
- cl5GetOperationCount(Object *replica)
- {
- Object *obj;
- CL5DBFile *file;
- int count = 0;
- int rc;
- if (s_cl5Desc.dbState == CL5_STATE_NONE) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5GetOperationCount - Changelog is not initialized\n");
- return -1;
- }
- /* make sure that changelog is open while operation is in progress */
- rc = _cl5AddThread();
- if (rc != CL5_SUCCESS)
- return -1;
- if (replica == NULL) /* compute total entry count */
- {
- obj = objset_first_obj(s_cl5Desc.dbFiles);
- while (obj) {
- file = (CL5DBFile *)object_get_data(obj);
- PR_ASSERT(file);
- count += file->entryCount;
- obj = objset_next_obj(s_cl5Desc.dbFiles, obj);
- }
- } else /* return count for particular db */
- {
- /* select correct db file */
- rc = _cl5GetDBFile(replica, &obj);
- if (rc == CL5_SUCCESS) {
- file = (CL5DBFile *)object_get_data(obj);
- PR_ASSERT(file);
- count = file->entryCount;
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5GetOperationCount - Found DB object %p\n", obj);
- object_release(obj);
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5GetOperationCount - Could not get DB object for replica\n");
- count = 0;
- }
- }
- _cl5RemoveThread();
- return count;
- }
- /***** Helper Functions *****/
- /* this call happens under state lock */
- static int
- _cl5Open(const char *dir, const CL5DBConfig *config, CL5OpenMode openMode)
- {
- int rc;
- PR_ASSERT(dir);
- /* setup db configuration parameters */
- if (config) {
- _cl5SetDBConfig(config);
- } else {
- _cl5SetDefaultDBConfig();
- }
- /* initialize trimming */
- rc = _cl5TrimInit();
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5Open - Failed to initialize trimming\n");
- goto done;
- }
- /* create the changelog directory if it does not exist */
- rc = cl5CreateDirIfNeeded(dir);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5Open - Failed to create changelog directory (%s)\n", dir);
- goto done;
- }
- if (s_cl5Desc.dbDir) {
- slapi_ch_free_string(&s_cl5Desc.dbDir);
- }
- s_cl5Desc.dbDir = slapi_ch_strdup(dir);
- /* check database version */
- rc = _cl5CheckDBVersion();
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5Open - Invalid db version\n");
- goto done;
- }
- s_cl5Desc.dbOpenMode = openMode;
- /* initialize db environment */
- rc = _cl5AppInit();
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5Open - Failed to initialize db environment\n");
- goto done;
- }
- /* init the clcache */
- if ((clcache_init(&s_cl5Desc.dbEnv) != 0)) {
- rc = CL5_SYSTEM_ERROR;
- goto done;
- }
- /* open database files */
- rc = _cl5DBOpen();
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5Open - Failed to open changelog database\n");
- goto done;
- }
- done:;
- if (rc != CL5_SUCCESS) {
- _cl5Close();
- }
- return rc;
- }
- int
- cl5CreateDirIfNeeded(const char *dirName)
- {
- int rc;
- char buff[MAXPATHLEN + 1];
- char *t;
- PR_ASSERT(dirName);
- rc = PR_Access(dirName, PR_ACCESS_EXISTS);
- if (rc == PR_SUCCESS) {
- return CL5_SUCCESS;
- }
- /* directory does not exist - try to create */
- PL_strncpyz(buff, dirName, sizeof(buff) - 1);
- t = strchr(buff, '/');
- /* skip first slash */
- if (t) {
- t = strchr(t + 1, '/');
- }
- while (t) {
- *t = '\0';
- if (PR_Access(buff, PR_ACCESS_EXISTS) != PR_SUCCESS) {
- rc = PR_MkDir(buff, DIR_CREATE_MODE);
- if (rc != PR_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5CreateDirIfNeeded - Failed to create dir (%s); NSPR error - %d\n",
- dirName, PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- }
- *t++ = FILE_PATHSEP;
- t = strchr(t, '/');
- }
- /* last piece */
- rc = PR_MkDir(buff, DIR_CREATE_MODE);
- if (rc != PR_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5CreateDirIfNeeded - Failed to create dir; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- return CL5_SUCCESS;
- }
- static int
- _cl5AppInit(void)
- {
- int rc = -1; /* initialize to failure */
- DB_ENV *dbEnv = NULL;
- size_t pagesize = 0;
- int openflags = 0;
- char *cookie = NULL;
- Slapi_Backend *be = slapi_get_first_backend(&cookie);
- while (be) {
- rc = slapi_back_get_info(be, BACK_INFO_DBENV, (void **)&dbEnv);
- if ((LDAP_SUCCESS == rc) && dbEnv) {
- rc = slapi_back_get_info(be,
- BACK_INFO_INDEXPAGESIZE, (void **)&pagesize);
- if ((LDAP_SUCCESS == rc) && pagesize) {
- rc = slapi_back_get_info(be,
- BACK_INFO_DBENV_OPENFLAGS, (void **)&openflags);
- if (LDAP_SUCCESS == rc) {
- break; /* Successfully fetched */
- }
- }
- }
- be = slapi_get_next_backend(cookie);
- }
- slapi_ch_free((void **)&cookie);
- if (rc == 0 && dbEnv && pagesize) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5AppInit - Fetched backend dbEnv (%p)\n", dbEnv);
- s_cl5Desc.dbEnv = dbEnv;
- s_cl5Desc.dbEnvOpenFlags = openflags;
- s_cl5Desc.dbConfig.pageSize = pagesize;
- return CL5_SUCCESS;
- } else {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5AppInit - Failed to fetch backend dbenv (%p) and/or "
- "index page size (%lu)\n",
- dbEnv, (long unsigned int)pagesize);
- return CL5_DB_ERROR;
- }
- }
- static int
- _cl5DBOpen()
- {
- PRBool dbFile;
- PRDir *dir;
- PRDirEntry *entry = NULL;
- int rc = -1; /* initialize to failure */
- Object *replica;
- int count = 0;
- /* create lock that guarantees that each file is only added once to the list */
- s_cl5Desc.fileLock = PR_NewLock();
- /* loop over all db files and open them; file name format is cl5_<dbid>.<dbext> */
- dir = PR_OpenDir(s_cl5Desc.dbDir);
- if (dir == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DBOpen - Failed to open changelog dir; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- /* initialize set of db file objects */
- s_cl5Desc.dbFiles = objset_new(NULL);
- while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) {
- if (NULL == entry->name) {
- break;
- }
- dbFile = _cl5FileName2Replica(entry->name, &replica);
- if (dbFile) /* this is db file, not a log or dbversion; those are just skipped */
- {
- /* we only open files for existing replicas */
- if (replica) {
- rc = _cl5DBOpenFile(replica, NULL /* file object */,
- PR_FALSE /* check for duplicates */);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen - "
- "Error opening file %s\n",
- entry->name);
- return rc;
- }
- object_release(replica);
- count++;
- } else /* there is no matching replica for the file - remove */
- {
- char fullpathname[MAXPATHLEN];
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen - "
- "File %s has no matching replica; removing\n",
- entry->name);
- PR_snprintf(fullpathname, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, entry->name);
- rc = s_cl5Desc.dbEnv->dbremove(s_cl5Desc.dbEnv,
- 0, fullpathname, 0,
- DEFAULT_DB_ENV_OP_FLAGS);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DBOpen - Failed to remove (%s) file; "
- "libdb error - %d (%s)\n",
- fullpathname, rc, db_strerror(rc));
- if (PR_Delete(fullpathname) != PR_SUCCESS) {
- PRErrorCode prerr = PR_GetError();
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5DBOpen - Failed to remove (%s) file; "
- "nspr error - %d (%s)\n",
- fullpathname, prerr, slapd_pr_strerror(prerr));
- }
- }
- }
- }
- }
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen - "
- "Opened %d existing databases in %s\n",
- count, s_cl5Desc.dbDir);
- PR_CloseDir(dir);
- return CL5_SUCCESS;
- }
- /* this function assumes that the entry was validated
- using IsValidOperation
- Data in db format:
- ------------------
- <1 byte version><1 byte change_type><sizeof time_t time><null terminated csn>
- <null terminated uniqueid><null terminated targetdn>
- [<null terminated newrdn><1 byte deleteoldrdn>][<4 byte mod count><mod1><mod2>....]
- mod format:
- -----------
- <1 byte modop><null terminated attr name><4 byte value count>
- <4 byte value size><value1><4 byte value size><value2>
- */
- static int
- _cl5Entry2DBData(const CL5Entry *entry, char **data, PRUint32 *len)
- {
- int size = 1 /* version */ + 1 /* operation type */ + sizeof(time_t);
- char *pos;
- PRUint32 t;
- slapi_operation_parameters *op;
- LDAPMod **add_mods = NULL;
- char *rawDN = NULL;
- char s[CSN_STRSIZE];
- PR_ASSERT(entry && entry->op && data && len);
- op = entry->op;
- PR_ASSERT(op->target_address.uniqueid);
- /* compute size of the buffer needed to hold the data */
- size += CSN_STRSIZE;
- size += strlen(op->target_address.uniqueid) + 1;
- switch (op->operation_type) {
- case SLAPI_OPERATION_ADD:
- if (op->p.p_add.parentuniqueid)
- size += strlen(op->p.p_add.parentuniqueid) + 1;
- else
- size++; /* we just store NULL char */
- slapi_entry2mods(op->p.p_add.target_entry, &rawDN /* dn */, &add_mods);
- size += strlen(rawDN) + 1;
- /* Need larger buffer for the encrypted changelog */
- if (s_cl5Desc.clcrypt_handle) {
- size += (_cl5GetModsSize(add_mods) * (1 + BACK_CRYPT_OUTBUFF_EXTLEN));
- } else {
- size += _cl5GetModsSize(add_mods);
- }
- break;
- case SLAPI_OPERATION_MODIFY:
- size += REPL_GET_DN_LEN(&op->target_address) + 1;
- /* Need larger buffer for the encrypted changelog */
- if (s_cl5Desc.clcrypt_handle) {
- size += (_cl5GetModsSize(op->p.p_modify.modify_mods) * (1 + BACK_CRYPT_OUTBUFF_EXTLEN));
- } else {
- size += _cl5GetModsSize(op->p.p_modify.modify_mods);
- }
- break;
- case SLAPI_OPERATION_MODRDN:
- size += REPL_GET_DN_LEN(&op->target_address) + 1;
- /* 1 for deleteoldrdn */
- size += strlen(op->p.p_modrdn.modrdn_newrdn) + 2;
- if (REPL_GET_DN(&op->p.p_modrdn.modrdn_newsuperior_address))
- size += REPL_GET_DN_LEN(&op->p.p_modrdn.modrdn_newsuperior_address) + 1;
- else
- size++; /* for NULL char */
- if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)
- size += strlen(op->p.p_modrdn.modrdn_newsuperior_address.uniqueid) + 1;
- else
- size++; /* for NULL char */
- /* Need larger buffer for the encrypted changelog */
- if (s_cl5Desc.clcrypt_handle) {
- size += (_cl5GetModsSize(op->p.p_modrdn.modrdn_mods) * (1 + BACK_CRYPT_OUTBUFF_EXTLEN));
- } else {
- size += _cl5GetModsSize(op->p.p_modrdn.modrdn_mods);
- }
- break;
- case SLAPI_OPERATION_DELETE:
- size += REPL_GET_DN_LEN(&op->target_address) + 1;
- break;
- }
- /* allocate data buffer */
- (*data) = slapi_ch_malloc(size);
- if ((*data) == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5Entry2DBData - Failed to allocate data buffer\n");
- return CL5_MEMORY_ERROR;
- }
- /* fill in the data buffer */
- pos = *data;
- /* write a byte of version */
- (*pos) = V_5;
- pos++;
- /* write change type */
- (*pos) = (unsigned char)op->operation_type;
- pos++;
- /* write time */
- t = PR_htonl((PRUint32)entry->time);
- memcpy(pos, &t, sizeof(t));
- pos += sizeof(t);
- /* write csn */
- _cl5WriteString(csn_as_string(op->csn, PR_FALSE, s), &pos);
- /* write UniqueID */
- _cl5WriteString(op->target_address.uniqueid, &pos);
- /* figure out what else we need to write depending on the operation type */
- switch (op->operation_type) {
- case SLAPI_OPERATION_ADD:
- _cl5WriteString(op->p.p_add.parentuniqueid, &pos);
- _cl5WriteString(rawDN, &pos);
- _cl5WriteMods(add_mods, &pos);
- slapi_ch_free((void **)&rawDN);
- ldap_mods_free(add_mods, 1);
- break;
- case SLAPI_OPERATION_MODIFY:
- _cl5WriteString(REPL_GET_DN(&op->target_address), &pos);
- _cl5WriteMods(op->p.p_modify.modify_mods, &pos);
- break;
- case SLAPI_OPERATION_MODRDN:
- _cl5WriteString(REPL_GET_DN(&op->target_address), &pos);
- _cl5WriteString(op->p.p_modrdn.modrdn_newrdn, &pos);
- *pos = (PRUint8)op->p.p_modrdn.modrdn_deloldrdn;
- pos++;
- _cl5WriteString(REPL_GET_DN(&op->p.p_modrdn.modrdn_newsuperior_address), &pos);
- _cl5WriteString(op->p.p_modrdn.modrdn_newsuperior_address.uniqueid, &pos);
- _cl5WriteMods(op->p.p_modrdn.modrdn_mods, &pos);
- break;
- case SLAPI_OPERATION_DELETE:
- _cl5WriteString(REPL_GET_DN(&op->target_address), &pos);
- break;
- }
- /* (*len) != size in case encrypted */
- (*len) = pos - *data;
- if (*len > size) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5Entry2DBData - real len %d > estimated size %d\n",
- *len, size);
- return CL5_MEMORY_ERROR;
- }
- return CL5_SUCCESS;
- }
- /*
- Data in db format:
- ------------------
- <1 byte version><1 byte change_type><sizeof time_t time><null terminated dbid>
- <null terminated csn><null terminated uniqueid><null terminated targetdn>
- [<null terminated newrdn><1 byte deleteoldrdn>][<4 byte mod count><mod1><mod2>....]
- mod format:
- -----------
- <1 byte modop><null terminated attr name><4 byte value count>
- <4 byte value size><value1><4 byte value size><value2>
- */
- int
- cl5DBData2Entry(const char *data, PRUint32 len __attribute__((unused)), CL5Entry *entry)
- {
- int rc;
- PRUint8 version;
- char *pos = (char *)data;
- char *strCSN;
- PRUint32 thetime;
- slapi_operation_parameters *op;
- LDAPMod **add_mods;
- char *rawDN = NULL;
- char s[CSN_STRSIZE];
- PR_ASSERT(data && entry && entry->op);
- /* ONREPL - check that we do not go beyond the end of the buffer */
- /* read byte of version */
- version = (PRUint8)(*pos);
- if (version != V_5) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5DBData2Entry - Invalid data version\n");
- return CL5_BAD_FORMAT;
- }
- op = entry->op;
- pos += sizeof(version);
- /* read change type */
- op->operation_type = (PRUint8)(*pos);
- pos++;
- /* need to do the copy first, to skirt around alignment problems on
- certain architectures */
- memcpy((char *)&thetime, pos, sizeof(thetime));
- entry->time = (time_t)PR_ntohl(thetime);
- pos += sizeof(thetime);
- /* read csn */
- _cl5ReadString(&strCSN, &pos);
- if (op->csn == NULL || strcmp(strCSN, csn_as_string(op->csn, PR_FALSE, s)) != 0) {
- op->csn = csn_new_by_string(strCSN);
- }
- slapi_ch_free((void **)&strCSN);
- /* read UniqueID */
- _cl5ReadString(&op->target_address.uniqueid, &pos);
- /* figure out what else we need to read depending on the operation type */
- switch (op->operation_type) {
- case SLAPI_OPERATION_ADD:
- _cl5ReadString(&op->p.p_add.parentuniqueid, &pos);
- /* richm: need to free parentuniqueid */
- _cl5ReadString(&rawDN, &pos);
- op->target_address.sdn = slapi_sdn_new_dn_passin(rawDN);
- /* convert mods to entry */
- rc = _cl5ReadMods(&add_mods, &pos);
- slapi_mods2entry(&(op->p.p_add.target_entry), rawDN, add_mods);
- ldap_mods_free(add_mods, 1);
- break;
- case SLAPI_OPERATION_MODIFY:
- _cl5ReadString(&rawDN, &pos);
- op->target_address.sdn = slapi_sdn_new_dn_passin(rawDN);
- rc = _cl5ReadMods(&op->p.p_modify.modify_mods, &pos);
- break;
- case SLAPI_OPERATION_MODRDN:
- _cl5ReadString(&rawDN, &pos);
- op->target_address.sdn = slapi_sdn_new_dn_passin(rawDN);
- _cl5ReadString(&op->p.p_modrdn.modrdn_newrdn, &pos);
- op->p.p_modrdn.modrdn_deloldrdn = *pos;
- pos++;
- _cl5ReadString(&rawDN, &pos);
- op->p.p_modrdn.modrdn_newsuperior_address.sdn = slapi_sdn_new_dn_passin(rawDN);
- _cl5ReadString(&op->p.p_modrdn.modrdn_newsuperior_address.uniqueid, &pos);
- rc = _cl5ReadMods(&op->p.p_modrdn.modrdn_mods, &pos);
- break;
- case SLAPI_OPERATION_DELETE:
- _cl5ReadString(&rawDN, &pos);
- op->target_address.sdn = slapi_sdn_new_dn_passin(rawDN);
- rc = CL5_SUCCESS;
- break;
- default:
- rc = CL5_BAD_FORMAT;
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5DBData2Entry - Failed to format entry\n");
- break;
- }
- return rc;
- }
- /* thread management functions */
- static int
- _cl5DispatchDBThreads(void)
- {
- PRThread *pth = NULL;
- pth = PR_CreateThread(PR_USER_THREAD, (VFP)(void *)_cl5TrimMain,
- NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
- PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
- if (NULL == pth) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DispatchDBThreads - Failed to create trimming thread"
- "; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- return CL5_SUCCESS;
- }
- static int
- _cl5AddThread(void)
- {
- /* lock the state lock so that nobody can change the state
- while backup is in progress
- */
- slapi_rwlock_rdlock(s_cl5Desc.stLock);
- /* open changelog if it is not already open */
- if (s_cl5Desc.dbState != CL5_STATE_OPEN) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5AddThread - Invalid changelog state - %d\n", s_cl5Desc.dbState);
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- return CL5_BAD_STATE;
- }
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- /* increment global thread count to make sure that changelog does not close while
- backup is in progress */
- PR_AtomicIncrement(&s_cl5Desc.threadCount);
- return CL5_SUCCESS;
- }
- static void
- _cl5RemoveThread(void)
- {
- PR_ASSERT(s_cl5Desc.threadCount > 0);
- PR_AtomicDecrement(&s_cl5Desc.threadCount);
- }
- /* data conversion functions */
- static void
- _cl5WriteString(const char *str, char **buff)
- {
- if (str) {
- strcpy(*buff, str);
- (*buff) += strlen(str) + 1;
- } else /* just write NULL char */
- {
- (**buff) = '\0';
- (*buff)++;
- }
- }
- static void
- _cl5ReadString(char **str, char **buff)
- {
- if (str) {
- int len = strlen(*buff);
- if (len) {
- *str = slapi_ch_strdup(*buff);
- (*buff) += len + 1;
- } else /* just null char - skip it */
- {
- *str = NULL;
- (*buff)++;
- }
- } else /* just skip this string */
- {
- (*buff) += strlen(*buff) + 1;
- }
- }
- /* mods format:
- -----------
- <4 byte mods count><mod1><mod2>...
- mod format:
- -----------
- <1 byte modop><null terminated attr name><4 byte count>
- <4 byte size><value1><4 byte size><value2>...
- */
- static void
- _cl5WriteMods(LDAPMod **mods, char **buff)
- {
- PRInt32 i;
- char *mod_start;
- PRInt32 count = 0;
- if (mods == NULL)
- return;
- /* skip mods count */
- mod_start = (*buff) + sizeof(count);
- /* write mods*/
- for (i = 0; mods[i]; i++) {
- if (0 <= _cl5WriteMod(mods[i], &mod_start)) {
- count++;
- }
- }
- count = PR_htonl(count);
- memcpy(*buff, &count, sizeof(count));
- (*buff) = mod_start;
- }
- /*
- * return values:
- * positive: no need to encrypt && succeeded to write a mod
- * 0: succeeded to encrypt && write a mod
- * netative: failed to encrypt && no write to the changelog
- */
- static int
- _cl5WriteMod(LDAPMod *mod, char **buff)
- {
- char *orig_pos;
- char *pos;
- PRInt32 count;
- struct berval *bv;
- struct berval *encbv;
- struct berval *bv_to_use;
- Slapi_Mod smod;
- int rc = -1;
- if (NULL == mod) {
- return rc;
- }
- if (SLAPD_UNHASHED_PW_NOLOG == slapi_config_get_unhashed_pw_switch()) {
- if (0 == strcasecmp(mod->mod_type, PSEUDO_ATTR_UNHASHEDUSERPASSWORD)) {
- /* If nsslapd-unhashed-pw-switch == nolog, skip writing it to cl. */
- return rc;
- }
- }
- slapi_mod_init_byref(&smod, mod);
- orig_pos = pos = *buff;
- /* write mod op */
- *pos = (PRUint8)slapi_mod_get_operation(&smod);
- pos++;
- /* write attribute name */
- _cl5WriteString(slapi_mod_get_type(&smod), &pos);
- /* write value count */
- count = PR_htonl(slapi_mod_get_num_values(&smod));
- memcpy(pos, &count, sizeof(count));
- pos += sizeof(PRInt32);
- /* if the mod has no values, eg delete attr or replace attr without values
- * do not reset buffer
- */
- rc = 0;
- bv = slapi_mod_get_first_value(&smod);
- while (bv) {
- encbv = NULL;
- rc = clcrypt_encrypt_value(s_cl5Desc.clcrypt_handle,
- bv, &encbv);
- if (rc > 0) {
- /* no encryption needed. use the original bv */
- bv_to_use = bv;
- } else if ((0 == rc) && encbv) {
- /* successfully encrypted. use the encrypted bv */
- bv_to_use = encbv;
- } else { /* failed */
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5WriteMod - Encrypting \"%s: %s\" failed\n",
- slapi_mod_get_type(&smod), bv->bv_val);
- bv_to_use = NULL;
- rc = -1;
- break;
- }
- if (bv_to_use) {
- _cl5WriteBerval(bv_to_use, &pos);
- }
- slapi_ch_bvfree(&encbv);
- bv = slapi_mod_get_next_value(&smod);
- }
- if (rc < 0) {
- (*buff) = orig_pos;
- } else {
- (*buff) = pos;
- }
- slapi_mod_done(&smod);
- return rc;
- }
- /* mods format:
- -----------
- <4 byte mods count><mod1><mod2>...
- mod format:
- -----------
- <1 byte modop><null terminated attr name><4 byte count>
- {<4 byte size><value1><4 byte size><value2>... ||
- <null terminated str1> <null terminated str2>...}
- */
- static int
- _cl5ReadMods(LDAPMod ***mods, char **buff)
- {
- char *pos = *buff;
- int i;
- int rc;
- PRInt32 mod_count;
- Slapi_Mods smods;
- Slapi_Mod smod;
- /* need to copy first, to skirt around alignment problems on certain
- architectures */
- memcpy((char *)&mod_count, *buff, sizeof(mod_count));
- mod_count = PR_ntohl(mod_count);
- pos += sizeof(mod_count);
- slapi_mods_init(&smods, mod_count);
- for (i = 0; i < mod_count; i++) {
- rc = _cl5ReadMod(&smod, &pos);
- if (rc != CL5_SUCCESS) {
- slapi_mods_done(&smods);
- return rc;
- }
- slapi_mods_add_smod(&smods, &smod);
- }
- *buff = pos;
- *mods = slapi_mods_get_ldapmods_passout(&smods);
- slapi_mods_done(&smods);
- return CL5_SUCCESS;
- }
- static int
- _cl5ReadMod(Slapi_Mod *smod, char **buff)
- {
- char *pos = *buff;
- int i;
- PRInt32 val_count;
- char *type;
- int op;
- struct berval bv;
- struct berval *decbv;
- struct berval *bv_to_use;
- int rc = 0;
- op = (*pos) & 0x000000FF;
- pos++;
- _cl5ReadString(&type, &pos);
- /* need to do the copy first, to skirt around alignment problems on
- certain architectures */
- memcpy((char *)&val_count, pos, sizeof(val_count));
- val_count = PR_ntohl(val_count);
- pos += sizeof(PRInt32);
- slapi_mod_init(smod, val_count);
- slapi_mod_set_operation(smod, op | LDAP_MOD_BVALUES);
- slapi_mod_set_type(smod, type);
- slapi_ch_free((void **)&type);
- for (i = 0; i < val_count; i++) {
- _cl5ReadBerval(&bv, &pos);
- decbv = NULL;
- rc = 0;
- rc = clcrypt_decrypt_value(s_cl5Desc.clcrypt_handle,
- &bv, &decbv);
- if (rc > 0) {
- /* not encrypted. use the original bv */
- bv_to_use = &bv;
- } else if ((0 == rc) && decbv) {
- /* successfully decrypted. use the decrypted bv */
- bv_to_use = decbv;
- } else { /* failed */
- char encstr[128];
- char *encend = encstr + 128;
- char *ptr;
- int i;
- for (i = 0, ptr = encstr; (i < bv.bv_len) && (ptr < encend - 4);
- i++, ptr += 3) {
- sprintf(ptr, "%x", 0xff & bv.bv_val[i]);
- }
- if (ptr >= encend - 4) {
- sprintf(ptr, "...");
- ptr += 3;
- }
- *ptr = '\0';
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5ReadMod - Decrypting \"%s: %s\" failed\n",
- slapi_mod_get_type(smod), encstr);
- bv_to_use = NULL;
- }
- if (bv_to_use) {
- slapi_mod_add_value(smod, bv_to_use);
- }
- slapi_ch_bvfree(&decbv);
- slapi_ch_free((void **)&bv.bv_val);
- }
- (*buff) = pos;
- return CL5_SUCCESS;
- }
- static int
- _cl5GetModsSize(LDAPMod **mods)
- {
- int size;
- int i;
- if (mods == NULL)
- return 0;
- size = sizeof(PRInt32);
- for (i = 0; mods[i]; i++) {
- size += _cl5GetModSize(mods[i]);
- }
- return size;
- }
- static int
- _cl5GetModSize(LDAPMod *mod)
- {
- int size;
- int i;
- size = 1 + strlen(mod->mod_type) + 1 + sizeof(mod->mod_op);
- i = 0;
- if (mod->mod_op & LDAP_MOD_BVALUES) /* values are in binary form */
- {
- while (mod->mod_bvalues != NULL && mod->mod_bvalues[i] != NULL) {
- size += (PRInt32)mod->mod_bvalues[i]->bv_len + sizeof(PRInt32);
- i++;
- }
- } else /* string data */
- {
- PR_ASSERT(0); /* ggood string values should never be used in the server */
- }
- return size;
- }
- static void
- _cl5ReadBerval(struct berval *bv, char **buff)
- {
- PRUint32 length = 0;
- PRUint32 net_length = 0;
- PR_ASSERT(bv && buff);
- /***PINAKI need to do the copy first, to skirt around alignment problems on
- certain architectures */
- /* DBDB : struct berval.bv_len is defined as unsigned long
- * But code here expects it to be 32-bits in size.
- * On 64-bit machines, this is not the case.
- * I changed the code to consistently use 32-bit (4-byte)
- * values on the encoded side. This means that it's
- * possible to generate a huge berval that will not
- * be encoded properly. However, this seems unlikely
- * to happen in reality, and I felt that retaining the
- * old on-disk format for the changely in the 64-bit
- * version of the server was important.
- */
- memcpy((char *)&net_length, *buff, sizeof(net_length));
- length = PR_ntohl(net_length);
- *buff += sizeof(net_length);
- bv->bv_len = length;
- if (bv->bv_len > 0) {
- bv->bv_val = slapi_ch_malloc(bv->bv_len);
- memcpy(bv->bv_val, *buff, bv->bv_len);
- *buff += bv->bv_len;
- } else {
- bv->bv_val = NULL;
- }
- }
- static void
- _cl5WriteBerval(struct berval *bv, char **buff)
- {
- PRUint32 length = 0;
- PRUint32 net_length = 0;
- length = (PRUint32)bv->bv_len;
- net_length = PR_htonl(length);
- memcpy(*buff, &net_length, sizeof(net_length));
- *buff += sizeof(net_length);
- memcpy(*buff, bv->bv_val, length);
- *buff += length;
- }
- /* data format: <value count> <value size> <value> <value size> <value> ..... */
- static int
- _cl5ReadBervals(struct berval ***bv, char **buff, unsigned int size __attribute__((unused)))
- {
- PRInt32 count;
- int i;
- char *pos;
- PR_ASSERT(bv && buff);
- /* ONREPL - need to check that we don't go beyond the end of the buffer */
- pos = *buff;
- memcpy((char *)&count, pos, sizeof(count));
- count = PR_htonl(count);
- pos += sizeof(count);
- /* allocate bervals */
- *bv = (struct berval **)slapi_ch_malloc((count + 1) * sizeof(struct berval *));
- if (*bv == NULL) {
- return CL5_MEMORY_ERROR;
- }
- for (i = 0; i < count; i++) {
- (*bv)[i] = (struct berval *)slapi_ch_malloc(sizeof(struct berval));
- if ((*bv)[i] == NULL) {
- ber_bvecfree(*bv);
- return CL5_MEMORY_ERROR;
- }
- _cl5ReadBerval((*bv)[i], &pos);
- }
- (*bv)[count] = NULL;
- *buff = pos;
- return CL5_SUCCESS;
- }
- /* data format: <value count> <value size> <value> <value size> <value> ..... */
- static int
- _cl5WriteBervals(struct berval **bv, char **buff, u_int32_t *size)
- {
- PRInt32 count, net_count;
- char *pos;
- int i;
- PR_ASSERT(bv && buff && size);
- /* compute number of values and size of the buffer to hold them */
- *size = sizeof(count);
- for (count = 0; bv[count]; count++) {
- *size += (u_int32_t)(sizeof(PRInt32) + (PRInt32)bv[count]->bv_len);
- }
- /* allocate buffer */
- *buff = (char *)slapi_ch_malloc(*size);
- if (*buff == NULL) {
- *size = 0;
- return CL5_MEMORY_ERROR;
- }
- /* fill the buffer */
- pos = *buff;
- net_count = PR_htonl(count);
- memcpy(pos, &net_count, sizeof(net_count));
- pos += sizeof(net_count);
- for (i = 0; i < count; i++) {
- _cl5WriteBerval(bv[i], &pos);
- }
- return CL5_SUCCESS;
- }
- /* upgrade from db33 to db41
- * 1. Run recovery on the database environment using the DB_ENV->open method
- * 2. Remove any Berkeley DB environment using the DB_ENV->remove method
- * 3. Remove any Berkeley DB transaction log files
- * 4. extention .db3 -> .db4
- */
- static int
- _cl5UpgradeMajor(char *fromVersion, char *toVersion)
- {
- PRDir *dir = NULL;
- PRDirEntry *entry = NULL;
- DB *thisdb = NULL;
- CL5OpenMode backup;
- int rc = 0;
- backup = s_cl5Desc.dbOpenMode;
- s_cl5Desc.dbOpenMode = CL5_OPEN_CLEAN_RECOVER;
- /* CL5_OPEN_CLEAN_RECOVER does 1 and 2 */
- rc = _cl5AppInit();
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5UpgradeMajor - Failed to open the db env\n");
- return rc;
- }
- s_cl5Desc.dbOpenMode = backup;
- dir = PR_OpenDir(s_cl5Desc.dbDir);
- if (dir == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5UpgradeMajor - Failed to open changelog dir %s; NSPR error - %d\n",
- s_cl5Desc.dbDir, PR_GetError());
- goto out;
- }
- while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) {
- if (NULL == entry->name) {
- break;
- }
- if (_cl5FileEndsWith(entry->name, DB_EXTENSION_DB3) ||
- _cl5FileEndsWith(entry->name, DB_EXTENSION_DB4)) {
- char oName[MAXPATHLEN + 1];
- char nName[MAXPATHLEN + 1];
- char *p = NULL;
- char c;
- int baselen = 0;
- PR_snprintf(oName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, entry->name);
- p = strstr(oName, DB_EXTENSION_DB3);
- if (NULL == p) {
- p = strstr(oName, DB_EXTENSION_DB4);
- if (NULL == p) {
- continue;
- }
- }
- /* db->rename closes DB; need to create every time */
- rc = db_create(&thisdb, s_cl5Desc.dbEnv, 0);
- if (0 != rc) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5UpgradeMajor - Failed to get db handle\n");
- goto out;
- }
- baselen = p - oName;
- c = *p;
- *p = '\0';
- PR_snprintf(nName, MAXPATHLEN + 1, "%s", oName);
- PR_snprintf(nName + baselen, MAXPATHLEN + 1 - baselen, "%s", DB_EXTENSION);
- *p = c;
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5UpgradeMajor - Renaming %s to %s\n", oName, nName);
- rc = thisdb->rename(thisdb, (const char *)oName, NULL /* subdb */,
- (const char *)nName, 0);
- if (rc != PR_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5UpgradeMajor - Failed to rename file (%s -> %s); "
- "db error - %d %s\n",
- oName, nName, rc, db_strerror(rc));
- break;
- }
- }
- }
- /* update the version file */
- _cl5WriteDBVersion();
- slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name_cl,
- "_cl5UpgradeMajor - Upgrading from %s to %s is successfully done (%s)\n",
- fromVersion, toVersion, s_cl5Desc.dbDir);
- out:
- if (NULL != dir) {
- PR_CloseDir(dir);
- }
- return rc;
- }
- /* upgrade from db41 -> db42 -> db43 -> db44 -> db45
- * 1. Run recovery on the database environment using the DB_ENV->open method
- * 2. Remove any Berkeley DB environment using the DB_ENV->remove method
- * 3. Remove any Berkeley DB transaction log files
- */
- static int
- _cl5UpgradeMinor(char *fromVersion, char *toVersion)
- {
- CL5OpenMode backup;
- int rc = 0;
- backup = s_cl5Desc.dbOpenMode;
- s_cl5Desc.dbOpenMode = CL5_OPEN_CLEAN_RECOVER;
- /* CL5_OPEN_CLEAN_RECOVER does 1 and 2 */
- rc = _cl5AppInit();
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5UpgradeMinor - Failed to open the db env\n");
- return rc;
- }
- s_cl5Desc.dbOpenMode = backup;
- /* update the version file */
- _cl5WriteDBVersion();
- slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name_cl,
- "_cl5UpgradeMinor - Upgrading from %s to %s is successfully done (%s)\n",
- fromVersion, toVersion, s_cl5Desc.dbDir);
- return rc;
- }
- static int
- _cl5CheckDBVersion(void)
- {
- char clVersion[VERSION_SIZE + 1];
- char dbVersion[VERSION_SIZE + 1];
- int rc;
- if (!cl5Exist(s_cl5Desc.dbDir)) {
- /* this is new changelog - write DB version and guardian file */
- rc = _cl5WriteDBVersion();
- } else {
- char *versionp = NULL;
- char *versionendp = NULL;
- char *dotp = NULL;
- int dbmajor = 0;
- int dbminor = 0;
- PR_snprintf(clVersion, VERSION_SIZE, "%s/%d.%d/%s",
- BDB_IMPL, DB_VERSION_MAJOR, DB_VERSION_MINOR, BDB_REPLPLUGIN);
- rc = _cl5ReadDBVersion(s_cl5Desc.dbDir, dbVersion, sizeof(dbVersion));
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5CheckDBVersion - Invalid dbversion\n");
- rc = CL5_BAD_DBVERSION;
- goto bailout;
- }
- versionendp = dbVersion + strlen(dbVersion);
- /* get the version number */
- /* old DBVERSION string: CL5_TYPE/REPL_PLUGIN_NAME/#.# */
- if (PL_strncmp(dbVersion, CL5_TYPE, strlen(CL5_TYPE)) == 0) {
- versionp = strrchr(dbVersion, '/');
- }
- /* new DBVERSION string: bdb/#.#/libreplication-plugin */
- else if (PL_strncmp(dbVersion, BDB_IMPL, strlen(BDB_IMPL)) == 0) {
- versionp = strchr(dbVersion, '/');
- }
- if (NULL == versionp || versionp == versionendp) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5CheckDBVersion - Invalid dbversion: %s\n", dbVersion);
- rc = CL5_BAD_DBVERSION;
- goto bailout;
- }
- dotp = strchr(++versionp, '.');
- if (NULL != dotp) {
- *dotp = '\0';
- dbmajor = strtol(versionp, (char **)NULL, 10);
- dbminor = strtol(dotp + 1, (char **)NULL, 10);
- *dotp = '.';
- } else {
- dbmajor = strtol(versionp, (char **)NULL, 10);
- }
- if (dbmajor < DB_VERSION_MAJOR) {
- /* upgrade */
- rc = _cl5UpgradeMajor(dbVersion, clVersion);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5CheckDBVersion - Upgrade %s -> %s failed\n",
- dbVersion, clVersion);
- rc = CL5_BAD_DBVERSION;
- }
- } else if (dbminor < DB_VERSION_MINOR) {
- /* minor upgrade */
- rc = _cl5UpgradeMinor(dbVersion, clVersion);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5CheckDBVersion - Upgrade %s -> %s failed\n",
- dbVersion, clVersion);
- rc = CL5_BAD_DBVERSION;
- }
- }
- }
- bailout:
- return rc;
- }
- static int
- _cl5ReadDBVersion(const char *dir, char *clVersion, int buflen)
- {
- int rc;
- PRFileDesc *file;
- char fName[MAXPATHLEN + 1];
- char buff[BUFSIZ];
- PRInt32 size;
- char *tok;
- char *iter = NULL;
- if (clVersion) {
- clVersion[0] = '\0';
- }
- PR_snprintf(fName, MAXPATHLEN, "%s/%s", dir, VERSION_FILE);
- file = PR_Open(fName, PR_RDONLY, 777);
- if (file == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5ReadDBVersion - Failed to open DBVERSION; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- size = slapi_read_buffer(file, buff, BUFSIZ);
- if (size < 0) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5ReadDBVersion - Failed to read DBVERSION; NSPR error - %d\n",
- PR_GetError());
- PR_Close(file);
- return CL5_SYSTEM_ERROR;
- }
- /* parse the data */
- buff[size] = '\0';
- tok = ldap_utf8strtok_r(buff, "\n", &iter);
- if (tok) {
- if (clVersion) {
- PL_strncpyz(clVersion, tok, buflen);
- }
- }
- rc = PR_Close(file);
- if (rc != PR_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5ReadDBVersion - Failed to close DBVERSION; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- return CL5_SUCCESS;
- }
- static int
- _cl5WriteDBVersion(void)
- {
- int rc;
- PRFileDesc *file;
- char fName[MAXPATHLEN + 1];
- char clVersion[VERSION_SIZE + 1];
- PRInt32 len, size;
- PR_snprintf(fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, VERSION_FILE);
- file = PR_Open(fName, PR_WRONLY | PR_CREATE_FILE | PR_TRUNCATE,
- s_cl5Desc.dbConfig.fileMode);
- if (file == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5WriteDBVersion - Failed to open DBVERSION; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- /* write changelog version */
- PR_snprintf(clVersion, VERSION_SIZE, "%s/%d.%d/%s\n",
- BDB_IMPL, DB_VERSION_MAJOR, DB_VERSION_MINOR, BDB_REPLPLUGIN);
- len = strlen(clVersion);
- size = slapi_write_buffer(file, clVersion, len);
- if (size != len) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5WriteDBVersion - Failed to write DBVERSION; NSPR error - %d\n",
- PR_GetError());
- PR_Close(file);
- return CL5_SYSTEM_ERROR;
- }
- rc = PR_Close(file);
- if (rc != PR_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5WriteDBVersion - Failed to close DBVERSION; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- return CL5_SUCCESS;
- }
- /* must be called under the state lock */
- static void
- _cl5Close(void)
- {
- PRIntervalTime interval;
- if (s_cl5Desc.dbState != CL5_STATE_CLOSED) /* Don't try to close twice */
- {
- /* cl5Close() set the state flag to CL5_STATE_CLOSING, which should
- trigger all of the db housekeeping threads to exit, and which will
- eventually cause no new update threads to start - so we wait here
- for those other threads to finish before we proceed */
- interval = PR_MillisecondsToInterval(100);
- while (s_cl5Desc.threadCount > 0) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5Close -Waiting for threads to exit: %d thread(s) still active\n",
- s_cl5Desc.threadCount);
- DS_Sleep(interval);
- }
- /* There should now be no threads accessing any of the changelog databases -
- it is safe to remove those databases */
- _cl5DBClose();
- /* cleanup trimming */
- _cl5TrimCleanup();
- /* remove changelog if requested */
- if (s_cl5Desc.dbRmOnClose) {
- if (_cl5Delete(s_cl5Desc.dbDir, 1) != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5Close - Failed to remove changelog\n");
- }
- s_cl5Desc.dbRmOnClose = PR_FALSE;
- }
- slapi_ch_free((void **)&s_cl5Desc.dbDir);
- memset(&s_cl5Desc.dbConfig, 0, sizeof(s_cl5Desc.dbConfig));
- s_cl5Desc.fatalError = PR_FALSE;
- s_cl5Desc.threadCount = 0;
- s_cl5Desc.dbOpenMode = CL5_OPEN_NONE;
- }
- }
- static void
- _cl5DBClose(void)
- {
- if (NULL != s_cl5Desc.dbFiles) {
- Object *obj;
- for (obj = objset_first_obj(s_cl5Desc.dbFiles); obj;
- obj = objset_next_obj(s_cl5Desc.dbFiles, obj)) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DBClose - Deleting DB object %p\n", obj);
- }
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DBClose - Closing databases in %s\n", s_cl5Desc.dbDir);
- objset_delete(&s_cl5Desc.dbFiles);
- }
- if (NULL != s_cl5Desc.fileLock) {
- PR_DestroyLock(s_cl5Desc.fileLock);
- s_cl5Desc.fileLock = NULL;
- }
- }
- /* see if the given file is a changelog db file */
- static int
- _cl5IsDbFile(const char *fname)
- {
- if (!fname || !*fname) {
- return 0;
- }
- if (!strcmp(fname, VERSION_FILE)) {
- return 1;
- }
- if (_cl5FileEndsWith(fname, DB_EXTENSION)) {
- return 1;
- }
- return 0; /* not a filename we recognize as being associated with the db */
- }
- /* state lock must be locked */
- static int
- _cl5Delete(const char *clDir, int rmDir)
- {
- PRDir *dir;
- char filename[MAXPATHLEN + 1];
- PRDirEntry *entry = NULL;
- int rc;
- int dirisempty = 1;
- /* remove all files in the directory and the directory */
- dir = PR_OpenDir(clDir);
- if (dir == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5Delete - Failed to open changelog dir; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) {
- if (NULL == entry->name) {
- break;
- }
- if (!_cl5IsDbFile(entry->name)) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5Delete - Skipping file [%s/%s] because it is not a changelogdb file.\n",
- clDir, entry->name);
- dirisempty = 0; /* skipped at least one file - dir not empty */
- continue;
- }
- PR_snprintf(filename, MAXPATHLEN, "%s/%s", clDir, entry->name);
- /* _cl5Delete deletes the whole changelog directory with all the files
- * underneath. Thus, we can just remove them physically. */
- if (0 == strcmp(entry->name, VERSION_FILE)) {
- /* DBVERSION */
- rc = PR_Delete(filename) != PR_SUCCESS;
- if (PR_SUCCESS != rc) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5Delete - Failed to remove \"%s\"; NSPR error - %d\n",
- filename, PR_GetError());
- }
- } else {
- /* DB files */
- rc = s_cl5Desc.dbEnv->dbremove(s_cl5Desc.dbEnv, 0, filename, 0,
- DEFAULT_DB_ENV_OP_FLAGS);
- if (rc) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5Delete - Failed to remove \"%s\"; "
- "libdb error - %d (%s)\n",
- filename, rc, db_strerror(rc));
- }
- }
- }
- rc = PR_CloseDir(dir);
- if (rc != PR_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5Delete - Failed to close changelog dir (%s); NSPR error - %d\n",
- clDir, PR_GetError());
- return CL5_SYSTEM_ERROR;
- }
- if (rmDir && dirisempty) {
- rc = PR_RmDir(clDir);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5Delete - Failed to remove changelog dir (%s); errno = %d\n",
- clDir, errno);
- return CL5_SYSTEM_ERROR;
- }
- } else if (rmDir && !dirisempty) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5Delete - Changelog dir (%s) is not empty - cannot remove\n",
- clDir);
- }
- /* invalidate the clcache */
- clcache_destroy();
- return CL5_SUCCESS;
- }
- static void
- _cl5SetDefaultDBConfig(void)
- {
- s_cl5Desc.dbConfig.fileMode = FILE_CREATE_MODE;
- }
- static void
- _cl5SetDBConfig(const CL5DBConfig *config)
- {
- /* s_cl5Desc.dbConfig.pageSize is retrieved from backend */
- /* Some other configuration parameters are hardcoded... */
- s_cl5Desc.dbConfig.fileMode = FILE_CREATE_MODE;
- }
- /* Trimming helper functions */
- static int
- _cl5TrimInit(void)
- {
- /* just create the lock while we are singlethreaded */
- s_cl5Desc.dbTrim.lock = PR_NewLock();
- if (s_cl5Desc.dbTrim.lock == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5InitTrimming - Failed to create lock; NSPR error - %d\n",
- PR_GetError());
- return CL5_SYSTEM_ERROR;
- } else {
- return CL5_SUCCESS;
- }
- }
- static void
- _cl5TrimCleanup(void)
- {
- if (s_cl5Desc.dbTrim.lock)
- PR_DestroyLock(s_cl5Desc.dbTrim.lock);
- memset(&s_cl5Desc.dbTrim, 0, sizeof(s_cl5Desc.dbTrim));
- }
- static int
- _cl5TrimMain(void *param __attribute__((unused)))
- {
- time_t timePrev = slapi_current_utc_time();
- time_t timeCompactPrev = slapi_current_utc_time();
- time_t timeNow;
- PR_AtomicIncrement(&s_cl5Desc.threadCount);
- while (s_cl5Desc.dbState != CL5_STATE_CLOSING) {
- timeNow = slapi_current_utc_time();
- if (timeNow - timePrev >= s_cl5Desc.dbTrim.trimInterval) {
- /* time to trim */
- timePrev = timeNow;
- _cl5DoTrimming();
- }
- if ((s_cl5Desc.dbTrim.compactInterval > 0) &&
- (timeNow - timeCompactPrev >= s_cl5Desc.dbTrim.compactInterval)) {
- /* time to trim */
- timeCompactPrev = timeNow;
- _cl5CompactDBs();
- }
- if (NULL == s_cl5Desc.clLock) {
- /* most likely, emergency */
- break;
- }
- PR_Lock(s_cl5Desc.clLock);
- PR_WaitCondVar(s_cl5Desc.clCvar, PR_SecondsToInterval(s_cl5Desc.dbTrim.trimInterval));
- PR_Unlock(s_cl5Desc.clLock);
- }
- PR_AtomicDecrement(&s_cl5Desc.threadCount);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5TrimMain - Exiting\n");
- return 0;
- }
- /*
- * We remove an entry if it has been replayed to all consumers and the number
- * of entries in the changelog is larger than maxEntries or age of the entry
- * is larger than maxAge. Also we can't purge entries which correspond to max
- * csns in the supplier's ruv. Here is a example where we can get into trouble:
- *
- * The server is setup with time based trimming and no consumer's
- * At some point all the entries are trimmed from the changelog.
- * At a later point a consumer is added and initialized online.
- * Then a change is made on the supplier.
- * To update the consumer, the supplier would attempt to locate the last
- * change sent to the consumer in the changelog and will fail because the
- * change was removed.
- */
- static void
- _cl5DoTrimming(void)
- {
- Object *obj;
- long numToTrim;
- PR_Lock(s_cl5Desc.dbTrim.lock);
- /*
- * We are trimming all the changelogs. We trim file by file which
- * means that some files will be trimmed more often than other. We
- * might have to fix that by, for example, randomizing the starting
- * point.
- */
- obj = objset_first_obj(s_cl5Desc.dbFiles);
- while (obj && _cl5CanTrim((time_t)0, &numToTrim)) {
- _cl5TrimFile(obj, &numToTrim);
- obj = objset_next_obj(s_cl5Desc.dbFiles, obj);
- }
- if (obj)
- object_release(obj);
- PR_Unlock(s_cl5Desc.dbTrim.lock);
- return;
- }
- /*
- * We are purging a changelog after a cleanAllRUV task. Find the specific
- * changelog for the backend that is being cleaned, and purge all the records
- * with the cleaned rid.
- */
- static void
- _cl5DoPurging(cleanruv_purge_data *purge_data)
- {
- ReplicaId rid = purge_data->cleaned_rid;
- const Slapi_DN *suffix_sdn = purge_data->suffix_sdn;
- const char *replName = purge_data->replName;
- char *replGen = purge_data->replGen;
- char *fileName;
- Object *obj;
- PR_Lock(s_cl5Desc.dbTrim.lock);
- fileName = _cl5MakeFileName(replName, replGen);
- obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
- if (obj) {
- /* We found our changelog, now purge it */
- _cl5PurgeRID(obj, rid);
- object_release(obj);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DoPurging - Purged rid (%d) from suffix (%s)\n",
- rid, slapi_sdn_get_dn(suffix_sdn));
- } else {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5DoPurging - Purge rid (%d) failed to find changelog file (%s) for suffix (%s)\n",
- rid, fileName, slapi_sdn_get_dn(suffix_sdn));
- }
- PR_Unlock(s_cl5Desc.dbTrim.lock);
- return;
- }
- /* clear free page files to reduce changelog */
- static void
- _cl5CompactDBs(void)
- {
- int rc;
- Object *fileObj = NULL;
- CL5DBFile *dbFile = NULL;
- DB *db = NULL;
- DB_TXN *txnid = NULL;
- DB_COMPACT c_data = {0};
- PR_Lock(s_cl5Desc.dbTrim.lock);
- rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0);
- if (rc) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5CompactDBs - Failed to begin transaction; db error - %d %s\n",
- rc, db_strerror(rc));
- goto bail;
- }
- for (fileObj = objset_first_obj(s_cl5Desc.dbFiles);
- fileObj;
- fileObj = objset_next_obj(s_cl5Desc.dbFiles, fileObj)) {
- dbFile = (CL5DBFile *)object_get_data(fileObj);
- if (!dbFile) {
- continue;
- }
- db = dbFile->db;
- rc = db->compact(db, txnid, NULL /*start*/, NULL /*stop*/,
- &c_data, DB_FREE_SPACE, NULL /*end*/);
- if (rc) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5CompactDBs - Failed to compact %s; db error - %d %s\n",
- dbFile->replName, rc, db_strerror(rc));
- goto bail;
- }
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5CompactDBs - %s - %d pages freed\n",
- dbFile->replName, c_data.compact_pages_free);
- }
- bail:
- if (fileObj) {
- object_release(fileObj);
- }
- if (rc) {
- rc = TXN_ABORT(txnid);
- if (rc) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5CompactDBs - Failed to abort transaction; db error - %d %s\n",
- rc, db_strerror(rc));
- }
- } else {
- rc = TXN_COMMIT(txnid);
- if (rc) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5CompactDBs - Failed to commit transaction; db error - %d %s\n",
- rc, db_strerror(rc));
- }
- }
- PR_Unlock(s_cl5Desc.dbTrim.lock);
- return;
- }
- /*
- * If the rid is not set it is the very first iteration of the changelog.
- * If the rid is set, we are doing another pass, and we have a key as our
- * starting point.
- */
- static int
- _cl5PurgeGetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key)
- {
- DBC *cursor = NULL;
- DBT data = {0};
- CL5Iterator *it;
- CL5DBFile *file;
- int rc;
- file = (CL5DBFile *)object_get_data(obj);
- /* create cursor */
- rc = file->db->cursor(file->db, txnid, &cursor, 0);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5PurgeGetFirstEntry - Failed to create cursor; db error - %d %s\n", rc, db_strerror(rc));
- rc = CL5_DB_ERROR;
- goto done;
- }
- key->flags = DB_DBT_MALLOC;
- data.flags = DB_DBT_MALLOC;
- while ((rc = cursor->c_get(cursor, key, &data, rid ? DB_SET : DB_NEXT)) == 0) {
- /* skip service entries on the first pass (rid == 0)*/
- if (!rid && cl5HelperEntry((char *)key->data, NULL)) {
- slapi_ch_free(&key->data);
- slapi_ch_free(&(data.data));
- continue;
- }
- /* format entry */
- rc = cl5DBData2Entry(data.data, data.size, entry);
- slapi_ch_free(&(data.data));
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5PurgeGetFirstEntry - Failed to format entry: %d\n", rc);
- goto done;
- }
- it = (CL5Iterator *)slapi_ch_malloc(sizeof(CL5Iterator));
- it->cursor = cursor;
- object_acquire(obj);
- it->file = obj;
- *(CL5Iterator **)iterator = it;
- return CL5_SUCCESS;
- }
- slapi_ch_free(&key->data);
- slapi_ch_free(&(data.data));
- /* walked of the end of the file */
- if (rc == DB_NOTFOUND) {
- rc = CL5_NOTFOUND;
- goto done;
- }
- /* db error occured while iterating */
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5PurgeGetFirstEntry - Failed to get entry; db error - %d %s\n",
- rc, db_strerror(rc));
- rc = CL5_DB_ERROR;
- done:
- /*
- * We didn't success in assigning this cursor to the iterator,
- * so we need to free the cursor here.
- */
- if (cursor)
- cursor->c_close(cursor);
- return rc;
- }
- /*
- * Get the next entry. If we get a lock error we will restart the process
- * starting at the current key.
- */
- static int
- _cl5PurgeGetNextEntry(CL5Entry *entry, void *iterator, DBT *key)
- {
- CL5Iterator *it;
- DBT data = {0};
- int rc;
- it = (CL5Iterator *)iterator;
- key->flags = DB_DBT_MALLOC;
- data.flags = DB_DBT_MALLOC;
- while ((rc = it->cursor->c_get(it->cursor, key, &data, DB_NEXT)) == 0) {
- if (cl5HelperEntry((char *)key->data, NULL)) {
- slapi_ch_free(&key->data);
- slapi_ch_free(&(data.data));
- continue;
- }
- /* format entry */
- rc = cl5DBData2Entry(data.data, data.size, entry);
- slapi_ch_free(&(data.data));
- if (rc != 0) {
- if (rc != CL5_DB_LOCK_ERROR) {
- /* Not a lock error, free the key */
- slapi_ch_free(&key->data);
- }
- slapi_log_err(rc == CL5_DB_LOCK_ERROR ? SLAPI_LOG_REPL : SLAPI_LOG_ERR,
- repl_plugin_name_cl,
- "_cl5PurgeGetNextEntry - Failed to format entry: %d\n",
- rc);
- }
- return rc;
- }
- slapi_ch_free(&(data.data));
- /* walked of the end of the file or entry is out of range */
- if (rc == 0 || rc == DB_NOTFOUND) {
- slapi_ch_free(&key->data);
- return CL5_NOTFOUND;
- }
- if (rc != CL5_DB_LOCK_ERROR) {
- /* Not a lock error, free the key */
- slapi_ch_free(&key->data);
- }
- /* cursor operation failed */
- slapi_log_err(rc == CL5_DB_LOCK_ERROR ? SLAPI_LOG_REPL : SLAPI_LOG_ERR,
- repl_plugin_name_cl,
- "_cl5PurgeGetNextEntry - Failed to get entry; db error - %d %s\n",
- rc, db_strerror(rc));
- return rc;
- }
- #define MAX_RETRIES 10
- /*
- * _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid)
- *
- * Clean the entire changelog of updates from the "cleaned rid" via CLEANALLRUV
- * Delete entries in batches so we don't consume too many db locks, and we don't
- * lockup the changelog during the entire purging process using one transaction.
- * We save the key from the last iteration so we don't have to start from the
- * beginning for each new iteration.
- */
- static void
- _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid)
- {
- slapi_operation_parameters op = {0};
- ReplicaId csn_rid;
- CL5Entry entry;
- DB_TXN *txnid = NULL;
- DBT key = {0};
- void *iterator = NULL;
- long totalTrimmed = 0;
- long trimmed = 0;
- char *starting_key = NULL;
- int batch_count = 0;
- int db_lock_retry_count = 0;
- int first_pass = 1;
- int finished = 0;
- int rc = 0;
- PR_ASSERT(obj);
- entry.op = &op;
- /*
- * Keep processing the changelog until we are done, shutting down, or we
- * maxed out on the db lock retries.
- */
- while (!finished && db_lock_retry_count < MAX_RETRIES && !slapi_is_shutting_down()) {
- trimmed = 0;
- /*
- * Sleep a bit to allow others to use the changelog - we can't hog the
- * changelog for the entire purge.
- */
- DS_Sleep(PR_MillisecondsToInterval(100));
- rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5PurgeRID - Failed to begin transaction; db error - %d %s. "
- "Changelog was not purged of rid(%d)\n",
- rc, db_strerror(rc), cleaned_rid);
- return;
- }
- /*
- * Check every changelog entry for the cleaned rid
- */
- rc = _cl5PurgeGetFirstEntry(obj, &entry, &iterator, txnid, first_pass ? 0 : cleaned_rid, &key);
- first_pass = 0;
- while (rc == CL5_SUCCESS && !slapi_is_shutting_down()) {
- /*
- * Store the new starting key - we need this starting key in case
- * we run out of locks and have to start the transaction over.
- */
- slapi_ch_free_string(&starting_key);
- starting_key = slapi_ch_strdup((char *)key.data);
- if (trimmed == 10000 || (batch_count && trimmed == batch_count)) {
- /*
- * Break out, and commit these deletes. Do not free the key,
- * we need it for the next pass.
- */
- cl5_operation_parameters_done(&op);
- db_lock_retry_count = 0; /* reset the retry count */
- break;
- }
- if (op.csn) {
- csn_rid = csn_get_replicaid(op.csn);
- if (csn_rid == cleaned_rid) {
- rc = _cl5CurrentDeleteEntry(iterator);
- if (rc != CL5_SUCCESS) {
- /* log error */
- cl5_operation_parameters_done(&op);
- if (rc == CL5_DB_LOCK_ERROR) {
- /*
- * Ran out of locks, need to restart the transaction.
- * Reduce the the batch count and reset the key to
- * the starting point
- */
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5PurgeRID - Ran out of db locks deleting entry. "
- "Reduce the batch value and restart.\n");
- batch_count = trimmed - 10;
- if (batch_count < 10) {
- batch_count = 10;
- }
- trimmed = 0;
- slapi_ch_free(&(key.data));
- key.data = starting_key;
- starting_key = NULL;
- db_lock_retry_count++;
- break;
- } else {
- /* fatal error */
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5PurgeRID - Fatal error (%d)\n", rc);
- slapi_ch_free(&(key.data));
- finished = 1;
- break;
- }
- }
- trimmed++;
- }
- }
- slapi_ch_free(&(key.data));
- cl5_operation_parameters_done(&op);
- rc = _cl5PurgeGetNextEntry(&entry, iterator, &key);
- if (rc == CL5_DB_LOCK_ERROR) {
- /*
- * Ran out of locks, need to restart the transaction.
- * Reduce the the batch count and reset the key to the starting
- * point.
- */
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5PurgeRID - Ran out of db locks getting the next entry. "
- "Reduce the batch value and restart.\n");
- batch_count = trimmed - 10;
- if (batch_count < 10) {
- batch_count = 10;
- }
- trimmed = 0;
- cl5_operation_parameters_done(&op);
- slapi_ch_free(&(key.data));
- key.data = starting_key;
- starting_key = NULL;
- db_lock_retry_count++;
- break;
- }
- }
- if (rc == CL5_NOTFOUND) {
- /* Scanned the entire changelog, we're done */
- finished = 1;
- }
- /* Destroy the iterator before we finish with the txn */
- cl5DestroyIterator(iterator);
- /*
- * Commit or abort the txn
- */
- if (rc == CL5_SUCCESS || rc == CL5_NOTFOUND) {
- rc = TXN_COMMIT(txnid);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5PurgeRID - Failed to commit transaction; db error - %d %s. "
- "Changelog was not completely purged of rid (%d)\n",
- rc, db_strerror(rc), cleaned_rid);
- break;
- } else if (finished) {
- /* We're done */
- totalTrimmed += trimmed;
- break;
- } else {
- /* Not done yet */
- totalTrimmed += trimmed;
- trimmed = 0;
- }
- } else {
- rc = TXN_ABORT(txnid);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5PurgeRID - Failed to abort transaction; db error - %d %s. "
- "Changelog was not completely purged of rid (%d)\n",
- rc, db_strerror(rc), cleaned_rid);
- }
- if (batch_count == 0) {
- /* This was not a retry. Fatal error, break out */
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5PurgeRID - Changelog was not purged of rid (%d)\n",
- cleaned_rid);
- break;
- }
- }
- }
- slapi_ch_free_string(&starting_key);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5PurgeRID - Removed (%ld entries) that originated from rid (%d)\n",
- totalTrimmed, cleaned_rid);
- }
- /* Note that each file contains changes for a single replicated area.
- trimming algorithm:
- */
- #define CL5_TRIM_MAX_PER_TRANSACTION 10
- static void
- _cl5TrimFile(Object *obj, long *numToTrim)
- {
- DB_TXN *txnid;
- RUV *ruv = NULL;
- CL5Entry entry;
- slapi_operation_parameters op = {0};
- ReplicaId csn_rid;
- void *it;
- int finished = 0, totalTrimmed = 0, count;
- PRBool abort;
- char strCSN[CSN_STRSIZE];
- int rc;
- PR_ASSERT(obj);
- /* construct the ruv up to which we can purge */
- rc = _cl5GetRUV2Purge2(obj, &ruv);
- if (rc != CL5_SUCCESS || ruv == NULL) {
- return;
- }
- entry.op = &op;
- while (!finished && !slapi_is_shutting_down()) {
- it = NULL;
- count = 0;
- txnid = NULL;
- abort = PR_FALSE;
- /* DB txn lock accessed pages until the end of the transaction. */
- rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5TrimFile - Failed to begin transaction; db error - %d %s\n",
- rc, db_strerror(rc));
- finished = PR_TRUE;
- break;
- }
- finished = _cl5GetFirstEntry(obj, &entry, &it, txnid);
- while (!finished && !slapi_is_shutting_down()) {
- /*
- * This change can be trimmed if it exceeds purge
- * parameters and has been seen by all consumers.
- */
- if (op.csn == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "_cl5TrimFile - "
- "Operation missing csn, moving on to next entry.\n");
- cl5_operation_parameters_done(&op);
- finished = _cl5GetNextEntry(&entry, it);
- continue;
- }
- csn_rid = csn_get_replicaid(op.csn);
- if ((*numToTrim > 0 || _cl5CanTrim(entry.time, numToTrim)) &&
- ruv_covers_csn_strict(ruv, op.csn)) {
- rc = _cl5CurrentDeleteEntry(it);
- if (rc == CL5_SUCCESS) {
- rc = _cl5UpdateRUV(obj, op.csn, PR_FALSE, PR_TRUE);
- }
- if (rc == CL5_SUCCESS) {
- if (*numToTrim > 0)
- (*numToTrim)--;
- count++;
- } else {
- /* The above two functions have logged the error */
- abort = PR_TRUE;
- }
- } else {
- /* The changelog DB is time ordered. If we can not trim
- * a CSN, we will not be allowed to trim the rest of the
- * CSNs generally. However, the maxcsn of each replica ID
- * is always kept in the changelog as an anchor for
- * replaying future changes. We have to skip those anchor
- * CSNs, otherwise a non-active replica ID could block
- * the trim forever.
- */
- CSN *maxcsn = NULL;
- ruv_get_largest_csn_for_replica(ruv, csn_rid, &maxcsn);
- if (csn_compare(op.csn, maxcsn) != 0) {
- /* op.csn is not anchor CSN */
- finished = 1;
- } else {
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5TrimFile - Changelog purge skipped anchor csn %s\n",
- csn_as_string(maxcsn, PR_FALSE, strCSN));
- }
- /* extra read to skip the current record */
- cl5_operation_parameters_done(&op);
- finished = _cl5GetNextEntry(&entry, it);
- }
- if (maxcsn)
- csn_free(&maxcsn);
- }
- cl5_operation_parameters_done(&op);
- if (finished || abort || count >= CL5_TRIM_MAX_PER_TRANSACTION) {
- /* If we reach CL5_TRIM_MAX_PER_TRANSACTION,
- * we close the cursor,
- * commit the transaction and restart a new transaction
- */
- break;
- }
- finished = _cl5GetNextEntry(&entry, it);
- }
- /* MAB: We need to close the cursor BEFORE the txn commits/aborts.
- * If we don't respect this order, we'll screw up the database,
- * placing it in DB_RUNRECOVERY mode
- */
- cl5DestroyIterator(it);
- if (abort) {
- finished = 1;
- rc = TXN_ABORT(txnid);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5TrimFile - Failed to abort transaction; db error - %d %s\n",
- rc, db_strerror(rc));
- }
- } else {
- rc = TXN_COMMIT(txnid);
- if (rc != 0) {
- finished = 1;
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5TrimFile - Failed to commit transaction; db error - %d %s\n",
- rc, db_strerror(rc));
- } else {
- totalTrimmed += count;
- }
- }
- } /* While (!finished) */
- if (ruv)
- ruv_destroy(&ruv);
- if (totalTrimmed) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5TrimFile - Trimmed %d changes from the changelog\n",
- totalTrimmed);
- }
- }
- static PRBool
- _cl5CanTrim(time_t time, long *numToTrim)
- {
- *numToTrim = 0;
- if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0) {
- return PR_FALSE;
- }
- if (s_cl5Desc.dbTrim.maxAge == 0) {
- *numToTrim = cl5GetOperationCount(NULL) - s_cl5Desc.dbTrim.maxEntries;
- return (*numToTrim > 0);
- }
- if (s_cl5Desc.dbTrim.maxEntries > 0 &&
- (*numToTrim = cl5GetOperationCount(NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) {
- return PR_TRUE;
- }
- if (time) {
- return (slapi_current_utc_time() - time > s_cl5Desc.dbTrim.maxAge);
- } else {
- return PR_TRUE;
- }
- }
- static int
- _cl5ReadRUV(const char *replGen, Object *obj, PRBool purge)
- {
- int rc;
- char csnStr[CSN_STRSIZE];
- DBT key = {0}, data = {0};
- struct berval **vals = NULL;
- CL5DBFile *file;
- char *pos;
- char *agmt_name;
- PR_ASSERT(replGen && obj);
- file = (CL5DBFile *)object_get_data(obj);
- PR_ASSERT(file);
- agmt_name = get_thread_private_agmtname();
- if (purge) { /* read purge vector entry */
- key.data = _cl5GetHelperEntryKey(PURGE_RUV_TIME, csnStr);
- } else { /* read upper bound vector */
- key.data = _cl5GetHelperEntryKey(MAX_RUV_TIME, csnStr);
- }
- key.size = CSN_STRSIZE;
- data.flags = DB_DBT_MALLOC;
- rc = file->db->get(file->db, NULL /*txn*/, &key, &data, 0);
- switch (rc) {
- case 0:
- pos = data.data;
- rc = _cl5ReadBervals(&vals, &pos, data.size);
- slapi_ch_free(&(data.data));
- if (rc != CL5_SUCCESS)
- goto done;
- if (purge) {
- rc = ruv_init_from_bervals(vals, &file->purgeRUV);
- } else {
- rc = ruv_init_from_bervals(vals, &file->maxRUV);
- }
- if (rc != RUV_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5ReadRUV - %s - Failed to initialize %s ruv; "
- "RUV error %d\n",
- agmt_name, purge ? "purge" : "upper bound", rc);
- rc = CL5_RUV_ERROR;
- goto done;
- }
- /* delete the entry; it is re-added when file
- is successfully closed */
- file->db->del(file->db, NULL, &key, 0);
- rc = CL5_SUCCESS;
- goto done;
- case DB_NOTFOUND: /* RUV is lost - need to construct */
- rc = _cl5ConstructRUV(replGen, obj, purge);
- goto done;
- default:
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5ReadRUV - %s - Failed to get purge RUV; "
- "db error - %d %s\n",
- agmt_name, rc, db_strerror(rc));
- rc = CL5_DB_ERROR;
- goto done;
- }
- done:
- ber_bvecfree(vals);
- return rc;
- }
- static int
- _cl5WriteRUV(CL5DBFile *file, PRBool purge)
- {
- int rc;
- DBT key = {0}, data = {0};
- char csnStr[CSN_STRSIZE];
- struct berval **vals;
- DB_TXN *txnid = NULL;
- char *buff;
- if ((purge && file->purgeRUV == NULL) || (!purge && file->maxRUV == NULL))
- return CL5_SUCCESS;
- if (purge) {
- /* Set the minimum CSN of each vector to a dummy CSN that contains
- * just a replica ID, e.g. 00000000000000010000.
- * The minimum CSN in a purge RUV is not used so the value doesn't
- * matter, but it needs to be set to something so that it can be
- * flushed to changelog at shutdown and parsed at startup with the
- * regular string-to-RUV parsing routines. */
- ruv_insert_dummy_min_csn(file->purgeRUV);
- key.data = _cl5GetHelperEntryKey(PURGE_RUV_TIME, csnStr);
- rc = ruv_to_bervals(file->purgeRUV, &vals);
- } else {
- key.data = _cl5GetHelperEntryKey(MAX_RUV_TIME, csnStr);
- rc = ruv_to_bervals(file->maxRUV, &vals);
- }
- key.size = CSN_STRSIZE;
- rc = _cl5WriteBervals(vals, &buff, &data.size);
- data.data = buff;
- ber_bvecfree(vals);
- if (rc != CL5_SUCCESS) {
- return rc;
- }
- rc = file->db->put(file->db, txnid, &key, &data, 0);
- slapi_ch_free(&(data.data));
- if (rc == 0) {
- return CL5_SUCCESS;
- } else {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5WriteRUV - Failed to write %s RUV for file %s; db error - %d (%s)\n",
- purge ? "purge" : "upper bound", file->name, rc, db_strerror(rc));
- if (CL5_OS_ERR_IS_DISKFULL(rc)) {
- cl5_set_diskfull();
- return CL5_DB_ERROR;
- }
- return CL5_DB_ERROR;
- }
- }
- /* This is a very slow process since we have to read every changelog entry.
- Hopefully, this function is not called too often */
- static int
- _cl5ConstructRUV(const char *replGen, Object *obj, PRBool purge)
- {
- int rc;
- CL5Entry entry;
- void *iterator = NULL;
- slapi_operation_parameters op = {0};
- CL5DBFile *file;
- ReplicaId rid;
- PR_ASSERT(replGen && obj);
- file = (CL5DBFile *)object_get_data(obj);
- PR_ASSERT(file);
- /* construct the RUV */
- if (purge)
- rc = ruv_init_new(replGen, 0, NULL, &file->purgeRUV);
- else
- rc = ruv_init_new(replGen, 0, NULL, &file->maxRUV);
- if (rc != RUV_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV - "
- "Failed to initialize %s RUV for file %s; ruv error - %d\n",
- purge ? "purge" : "upper bound", file->name, rc);
- return CL5_RUV_ERROR;
- }
- slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name_cl,
- "_cl5ConstructRUV - Rebuilding the replication changelog RUV, "
- "this may take several minutes...\n");
- entry.op = &op;
- rc = _cl5GetFirstEntry(obj, &entry, &iterator, NULL);
- while (rc == CL5_SUCCESS) {
- if (op.csn) {
- rid = csn_get_replicaid(op.csn);
- } else {
- slapi_log_err(SLAPI_LOG_WARNING, repl_plugin_name_cl, "_cl5ConstructRUV - "
- "Operation missing csn, moving on to next entry.\n");
- cl5_operation_parameters_done(&op);
- rc = _cl5GetNextEntry(&entry, iterator);
- continue;
- }
- if (is_cleaned_rid(rid)) {
- /* skip this entry as the rid is invalid */
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV - "
- "Skipping entry because its csn contains a cleaned rid(%d)\n",
- rid);
- cl5_operation_parameters_done(&op);
- rc = _cl5GetNextEntry(&entry, iterator);
- continue;
- }
- if (purge)
- rc = ruv_set_csns_keep_smallest(file->purgeRUV, op.csn);
- else
- rc = ruv_set_csns(file->maxRUV, op.csn, NULL);
- cl5_operation_parameters_done(&op);
- if (rc != RUV_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV - "
- "Failed to update %s RUV for file %s; ruv error - %d\n",
- purge ? "purge" : "upper bound", file->name, rc);
- rc = CL5_RUV_ERROR;
- continue;
- }
- rc = _cl5GetNextEntry(&entry, iterator);
- }
- cl5_operation_parameters_done(&op);
- if (iterator)
- cl5DestroyIterator(iterator);
- if (rc == CL5_NOTFOUND) {
- rc = CL5_SUCCESS;
- } else {
- if (purge)
- ruv_destroy(&file->purgeRUV);
- else
- ruv_destroy(&file->maxRUV);
- }
- slapi_log_err(SLAPI_LOG_NOTICE, repl_plugin_name_cl,
- "_cl5ConstructRUV - Rebuilding replication changelog RUV complete. Result %d (%s)\n",
- rc, rc ? "Failed to rebuild changelog RUV" : "Success");
- return rc;
- }
- static int
- _cl5UpdateRUV(Object *obj, CSN *csn, PRBool newReplica, PRBool purge)
- {
- ReplicaId rid;
- int rc = RUV_SUCCESS; /* initialize rc to avoid erroneous logs */
- CL5DBFile *file;
- PR_ASSERT(obj && csn);
- file = (CL5DBFile *)object_get_data(obj);
- /*
- * if purge is TRUE, file->purgeRUV must be set;
- * if purge is FALSE, maxRUV must be set
- */
- PR_ASSERT(file && ((purge && file->purgeRUV) || (!purge && file->maxRUV)));
- rid = csn_get_replicaid(csn);
- /* update vector only if this replica is not yet part of RUV */
- if (purge && newReplica) {
- if (ruv_contains_replica(file->purgeRUV, rid)) {
- return CL5_SUCCESS;
- } else {
- /* if the replica is not part of the purgeRUV yet, add it unless it's from a cleaned rid */
- ruv_add_replica(file->purgeRUV, rid, multimaster_get_local_purl());
- }
- } else {
- if (purge) {
- rc = ruv_set_csns(file->purgeRUV, csn, NULL);
- } else {
- rc = ruv_set_csns(file->maxRUV, csn, NULL);
- }
- }
- if (rc != RUV_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5UpdatePurgeRUV - "
- "Failed to update %s RUV for file %s; ruv error - %d\n",
- purge ? "purge" : "upper bound", file->name, rc);
- return CL5_RUV_ERROR;
- }
- return CL5_SUCCESS;
- }
- static int
- _cl5EnumConsumerRUV(const ruv_enum_data *element, void *arg)
- {
- int rc;
- RUV *ruv;
- CSN *csn = NULL;
- PR_ASSERT(element && element->csn && arg);
- ruv = (RUV *)arg;
- rc = ruv_get_largest_csn_for_replica(ruv, csn_get_replicaid(element->csn), &csn);
- if (rc != RUV_SUCCESS || csn == NULL || csn_compare(element->csn, csn) < 0) {
- ruv_set_max_csn(ruv, element->csn, NULL);
- }
- if (csn)
- csn_free(&csn);
- return 0;
- }
- static int
- _cl5GetRUV2Purge2(Object *fileObj, RUV **ruv)
- {
- int rc = CL5_SUCCESS;
- CL5DBFile *dbFile;
- Object *rObj = NULL;
- Replica *r = NULL;
- Object *agmtObj = NULL;
- Repl_Agmt *agmt;
- Object *consRUVObj, *supRUVObj;
- RUV *consRUV, *supRUV;
- CSN *csn;
- PR_ASSERT(fileObj && ruv);
- if (!ruv) {
- rc = CL5_UNKNOWN_ERROR;
- goto done;
- }
- dbFile = (CL5DBFile *)object_get_data(fileObj);
- PR_ASSERT(dbFile);
- rObj = replica_get_by_name(dbFile->replName);
- PR_ASSERT(rObj);
- if (!rObj) {
- rc = CL5_NOTFOUND;
- goto done;
- }
- r = (Replica *)object_get_data(rObj);
- PR_ASSERT(r);
- /* We start with this replica's RUV. See note in _cl5DoTrimming */
- supRUVObj = replica_get_ruv(r);
- PR_ASSERT(supRUVObj);
- supRUV = (RUV *)object_get_data(supRUVObj);
- PR_ASSERT(supRUV);
- *ruv = ruv_dup(supRUV);
- object_release(supRUVObj);
- agmtObj = agmtlist_get_first_agreement_for_replica(r);
- while (agmtObj) {
- agmt = (Repl_Agmt *)object_get_data(agmtObj);
- PR_ASSERT(agmt);
- if (!agmt_is_enabled(agmt)) {
- agmtObj = agmtlist_get_next_agreement_for_replica(r, agmtObj);
- continue;
- }
- consRUVObj = agmt_get_consumer_ruv(agmt);
- if (consRUVObj) {
- consRUV = (RUV *)object_get_data(consRUVObj);
- rc = ruv_enumerate_elements(consRUV, _cl5EnumConsumerRUV, *ruv);
- if (rc != RUV_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetRUV2Purge2 - "
- "Failed to construct ruv; ruv error - %d\n",
- rc);
- rc = CL5_RUV_ERROR;
- object_release(consRUVObj);
- object_release(agmtObj);
- break;
- }
- object_release(consRUVObj);
- }
- agmtObj = agmtlist_get_next_agreement_for_replica(r, agmtObj);
- }
- /* check if there is any data in the constructed ruv - otherwise get rid of it */
- if (ruv_get_max_csn(*ruv, &csn) != RUV_SUCCESS || csn == NULL) {
- ruv_destroy(ruv);
- } else {
- csn_free(&csn);
- }
- done:
- if (rObj)
- object_release(rObj);
- if (rc != CL5_SUCCESS && ruv)
- ruv_destroy(ruv);
- return rc;
- }
- static int
- _cl5GetEntryCount(CL5DBFile *file)
- {
- int rc;
- char csnStr[CSN_STRSIZE];
- DBT key = {0}, data = {0};
- DB_BTREE_STAT *stats = NULL;
- PR_ASSERT(file);
- /* read entry count. if the entry is there - the file was successfully closed
- last time it was used */
- key.data = _cl5GetHelperEntryKey(ENTRY_COUNT_TIME, csnStr);
- key.size = CSN_STRSIZE;
- data.flags = DB_DBT_MALLOC;
- rc = file->db->get(file->db, NULL /*txn*/, &key, &data, 0);
- switch (rc) {
- case 0:
- file->entryCount = *(int *)data.data;
- slapi_ch_free(&(data.data));
- /* delete the entry. the entry is re-added when file
- is successfully closed */
- file->db->del(file->db, NULL, &key, 0);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5GetEntryCount - %d changes for replica %s\n",
- file->entryCount, file->replName);
- return CL5_SUCCESS;
- case DB_NOTFOUND:
- file->entryCount = 0;
- rc = file->db->stat(file->db, NULL, (void *)&stats, 0);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5GetEntryCount - Failed to get changelog statistics; "
- "db error - %d %s\n",
- rc, db_strerror(rc));
- return CL5_DB_ERROR;
- }
- #ifdef DB30
- file->entryCount = stats->bt_nrecs;
- #else /* DB31 */
- file->entryCount = stats->bt_ndata;
- #endif
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5GetEntryCount - %d changes for replica %s\n",
- file->entryCount, file->replName);
- slapi_ch_free((void **)&stats);
- return CL5_SUCCESS;
- default:
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5GetEntryCount - Failed to get count entry; "
- "db error - %d %s\n",
- rc, db_strerror(rc));
- return CL5_DB_ERROR;
- }
- }
- static int
- _cl5WriteEntryCount(CL5DBFile *file)
- {
- int rc;
- DBT key = {0}, data = {0};
- char csnStr[CSN_STRSIZE];
- DB_TXN *txnid = NULL;
- key.data = _cl5GetHelperEntryKey(ENTRY_COUNT_TIME, csnStr);
- key.size = CSN_STRSIZE;
- data.data = (void *)&file->entryCount;
- data.size = sizeof(file->entryCount);
- rc = file->db->put(file->db, txnid, &key, &data, 0);
- if (rc == 0) {
- return CL5_SUCCESS;
- } else {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5WriteEntryCount - "
- "Failed to write count entry for file %s; db error - %d %s\n",
- file->name, rc, db_strerror(rc));
- if (CL5_OS_ERR_IS_DISKFULL(rc)) {
- cl5_set_diskfull();
- return CL5_DB_ERROR;
- }
- return CL5_DB_ERROR;
- }
- }
- static const char *
- _cl5OperationType2Str(int type)
- {
- switch (type) {
- case SLAPI_OPERATION_ADD:
- return T_ADDCTSTR;
- case SLAPI_OPERATION_MODIFY:
- return T_MODIFYCTSTR;
- case SLAPI_OPERATION_MODRDN:
- return T_MODRDNCTSTR;
- case SLAPI_OPERATION_DELETE:
- return T_DELETECTSTR;
- default:
- return NULL;
- }
- }
- static int
- _cl5Str2OperationType(const char *str)
- {
- if (strcasecmp(str, T_ADDCTSTR) == 0)
- return SLAPI_OPERATION_ADD;
- if (strcasecmp(str, T_MODIFYCTSTR) == 0)
- return SLAPI_OPERATION_MODIFY;
- if (strcasecmp(str, T_MODRDNCTSTR) == 0)
- return SLAPI_OPERATION_MODRDN;
- if (strcasecmp(str, T_DELETECTSTR) == 0)
- return SLAPI_OPERATION_DELETE;
- return -1;
- }
- static int
- _cl5Operation2LDIF(const slapi_operation_parameters *op, const char *replGen, char **ldifEntry, PRInt32 *lenLDIF)
- {
- int len = 2;
- lenstr *l = NULL;
- const char *strType;
- const char *strDeleteOldRDN = "false";
- char *buff, *start;
- LDAPMod **add_mods;
- char *rawDN = NULL;
- char strCSN[CSN_STRSIZE];
- PR_ASSERT(op && replGen && ldifEntry && IsValidOperation(op));
- strType = _cl5OperationType2Str(op->operation_type);
- csn_as_string(op->csn, PR_FALSE, strCSN);
- /* find length of the buffer */
- len += LDIF_SIZE_NEEDED(strlen(T_CHANGETYPESTR), strlen(strType));
- len += LDIF_SIZE_NEEDED(strlen(T_REPLGEN), strlen(replGen));
- len += LDIF_SIZE_NEEDED(strlen(T_CSNSTR), strlen(strCSN));
- len += LDIF_SIZE_NEEDED(strlen(T_UNIQUEIDSTR), strlen(op->target_address.uniqueid));
- switch (op->operation_type) {
- case SLAPI_OPERATION_ADD:
- if (NULL == op->p.p_add.target_entry) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5Operation2LDIF - ADD - entry is NULL\n");
- return CL5_BAD_FORMAT;
- }
- if (op->p.p_add.parentuniqueid)
- len += LDIF_SIZE_NEEDED(strlen(T_PARENTIDSTR), strlen(op->p.p_add.parentuniqueid));
- slapi_entry2mods(op->p.p_add.target_entry, &rawDN, &add_mods);
- len += LDIF_SIZE_NEEDED(strlen(T_DNSTR), strlen(rawDN));
- l = make_changes_string(add_mods, NULL);
- len += LDIF_SIZE_NEEDED(strlen(T_CHANGESTR), l->ls_len);
- ldap_mods_free(add_mods, 1);
- break;
- case SLAPI_OPERATION_MODIFY:
- if (NULL == op->p.p_modify.modify_mods) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5Operation2LDIF - MODIFY - mods are NULL\n");
- return CL5_BAD_FORMAT;
- }
- len += LDIF_SIZE_NEEDED(strlen(T_DNSTR), REPL_GET_DN_LEN(&op->target_address));
- l = make_changes_string(op->p.p_modify.modify_mods, NULL);
- len += LDIF_SIZE_NEEDED(strlen(T_CHANGESTR), l->ls_len);
- break;
- case SLAPI_OPERATION_MODRDN:
- if (NULL == op->p.p_modrdn.modrdn_mods) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5Operation2LDIF - MODRDN - mods are NULL\n");
- return CL5_BAD_FORMAT;
- }
- len += LDIF_SIZE_NEEDED(strlen(T_DNSTR), REPL_GET_DN_LEN(&op->target_address));
- len += LDIF_SIZE_NEEDED(strlen(T_NEWRDNSTR), strlen(op->p.p_modrdn.modrdn_newrdn));
- strDeleteOldRDN = (op->p.p_modrdn.modrdn_deloldrdn ? "true" : "false");
- len += LDIF_SIZE_NEEDED(strlen(T_DRDNFLAGSTR),
- strlen(strDeleteOldRDN));
- if (REPL_GET_DN(&op->p.p_modrdn.modrdn_newsuperior_address))
- len += LDIF_SIZE_NEEDED(strlen(T_NEWSUPERIORDNSTR),
- REPL_GET_DN_LEN(&op->p.p_modrdn.modrdn_newsuperior_address));
- if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)
- len += LDIF_SIZE_NEEDED(strlen(T_NEWSUPERIORIDSTR),
- strlen(op->p.p_modrdn.modrdn_newsuperior_address.uniqueid));
- l = make_changes_string(op->p.p_modrdn.modrdn_mods, NULL);
- len += LDIF_SIZE_NEEDED(strlen(T_CHANGESTR), l->ls_len);
- break;
- case SLAPI_OPERATION_DELETE:
- if (NULL == REPL_GET_DN(&op->target_address)) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5Operation2LDIF - DELETE - target dn is NULL\n");
- return CL5_BAD_FORMAT;
- }
- len += LDIF_SIZE_NEEDED(strlen(T_DNSTR), REPL_GET_DN_LEN(&op->target_address));
- break;
- default:
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5Operation2LDIF - Invalid operation type - %lu\n", op->operation_type);
- return CL5_BAD_FORMAT;
- }
- /* allocate buffer */
- buff = slapi_ch_malloc(len);
- start = buff;
- if (buff == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5Operation2LDIF: memory allocation failed\n");
- return CL5_MEMORY_ERROR;
- }
- /* fill buffer */
- slapi_ldif_put_type_and_value_with_options(&buff, T_CHANGETYPESTR, (char *)strType, strlen(strType), 0);
- slapi_ldif_put_type_and_value_with_options(&buff, T_REPLGEN, (char *)replGen, strlen(replGen), 0);
- slapi_ldif_put_type_and_value_with_options(&buff, T_CSNSTR, (char *)strCSN, strlen(strCSN), 0);
- slapi_ldif_put_type_and_value_with_options(&buff, T_UNIQUEIDSTR, op->target_address.uniqueid,
- strlen(op->target_address.uniqueid), 0);
- switch (op->operation_type) {
- case SLAPI_OPERATION_ADD:
- if (op->p.p_add.parentuniqueid)
- slapi_ldif_put_type_and_value_with_options(&buff, T_PARENTIDSTR,
- op->p.p_add.parentuniqueid, strlen(op->p.p_add.parentuniqueid), 0);
- slapi_ldif_put_type_and_value_with_options(&buff, T_DNSTR, rawDN, strlen(rawDN), 0);
- slapi_ldif_put_type_and_value_with_options(&buff, T_CHANGESTR, l->ls_buf, l->ls_len, 0);
- slapi_ch_free((void **)&rawDN);
- break;
- case SLAPI_OPERATION_MODIFY:
- slapi_ldif_put_type_and_value_with_options(&buff, T_DNSTR, REPL_GET_DN(&op->target_address),
- REPL_GET_DN_LEN(&op->target_address), 0);
- slapi_ldif_put_type_and_value_with_options(&buff, T_CHANGESTR, l->ls_buf, l->ls_len, 0);
- break;
- case SLAPI_OPERATION_MODRDN:
- slapi_ldif_put_type_and_value_with_options(&buff, T_DNSTR, REPL_GET_DN(&op->target_address),
- REPL_GET_DN_LEN(&op->target_address), 0);
- slapi_ldif_put_type_and_value_with_options(&buff, T_NEWRDNSTR, op->p.p_modrdn.modrdn_newrdn,
- strlen(op->p.p_modrdn.modrdn_newrdn), 0);
- slapi_ldif_put_type_and_value_with_options(&buff, T_DRDNFLAGSTR, strDeleteOldRDN,
- strlen(strDeleteOldRDN), 0);
- if (REPL_GET_DN(&op->p.p_modrdn.modrdn_newsuperior_address))
- slapi_ldif_put_type_and_value_with_options(&buff, T_NEWSUPERIORDNSTR,
- REPL_GET_DN(&op->p.p_modrdn.modrdn_newsuperior_address),
- REPL_GET_DN_LEN(&op->p.p_modrdn.modrdn_newsuperior_address), 0);
- if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)
- slapi_ldif_put_type_and_value_with_options(&buff, T_NEWSUPERIORIDSTR,
- op->p.p_modrdn.modrdn_newsuperior_address.uniqueid,
- strlen(op->p.p_modrdn.modrdn_newsuperior_address.uniqueid), 0);
- slapi_ldif_put_type_and_value_with_options(&buff, T_CHANGESTR, l->ls_buf, l->ls_len, 0);
- break;
- case SLAPI_OPERATION_DELETE:
- slapi_ldif_put_type_and_value_with_options(&buff, T_DNSTR, REPL_GET_DN(&op->target_address),
- REPL_GET_DN_LEN(&op->target_address), 0);
- break;
- }
- *buff = '\n';
- buff++;
- *buff = '\0';
- *ldifEntry = start;
- *lenLDIF = buff - start;
- if (l)
- lenstr_free(&l);
- return CL5_SUCCESS;
- }
- static int
- _cl5LDIF2Operation(char *ldifEntry, slapi_operation_parameters *op, char **replGen)
- {
- int rc;
- int rval = CL5_BAD_FORMAT;
- char *next, *line;
- struct berval type, value;
- struct berval bv_null = {0, NULL};
- int freeval = 0;
- Slapi_Mods *mods;
- char *rawDN = NULL;
- char *ldifEntryWork = slapi_ch_strdup(ldifEntry);
- PR_ASSERT(op && ldifEntry && replGen);
- memset(op, 0, sizeof(*op));
- next = ldifEntryWork;
- while ((line = ldif_getline(&next)) != NULL) {
- if (*line == '\n' || *line == '\0') {
- break;
- }
- /* this call modifies ldifEntry */
- type = bv_null;
- value = bv_null;
- rc = slapi_ldif_parse_line(line, &type, &value, &freeval);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5LDIF2Operation - Failed to parse ldif line, moving on...\n");
- continue;
- }
- if (strncasecmp(type.bv_val, T_CHANGETYPESTR,
- strlen(T_CHANGETYPESTR) > type.bv_len ? strlen(T_CHANGETYPESTR) : type.bv_len) == 0) {
- op->operation_type = _cl5Str2OperationType(value.bv_val);
- } else if (strncasecmp(type.bv_val, T_REPLGEN, type.bv_len) == 0) {
- *replGen = slapi_ch_strdup(value.bv_val);
- } else if (strncasecmp(type.bv_val, T_CSNSTR, type.bv_len) == 0) {
- op->csn = csn_new_by_string(value.bv_val);
- } else if (strncasecmp(type.bv_val, T_UNIQUEIDSTR, type.bv_len) == 0) {
- op->target_address.uniqueid = slapi_ch_strdup(value.bv_val);
- } else if (strncasecmp(type.bv_val, T_DNSTR, type.bv_len) == 0) {
- PR_ASSERT(op->operation_type);
- if (op->operation_type == SLAPI_OPERATION_ADD) {
- rawDN = slapi_ch_strdup(value.bv_val);
- op->target_address.sdn = slapi_sdn_new_dn_byval(rawDN);
- } else
- op->target_address.sdn = slapi_sdn_new_dn_byval(value.bv_val);
- } else if (strncasecmp(type.bv_val, T_PARENTIDSTR, type.bv_len) == 0) {
- op->p.p_add.parentuniqueid = slapi_ch_strdup(value.bv_val);
- } else if (strncasecmp(type.bv_val, T_NEWRDNSTR, type.bv_len) == 0) {
- op->p.p_modrdn.modrdn_newrdn = slapi_ch_strdup(value.bv_val);
- } else if (strncasecmp(type.bv_val, T_DRDNFLAGSTR, type.bv_len) == 0) {
- op->p.p_modrdn.modrdn_deloldrdn = (strncasecmp(value.bv_val, "true", value.bv_len) ? PR_FALSE : PR_TRUE);
- } else if (strncasecmp(type.bv_val, T_NEWSUPERIORDNSTR, type.bv_len) == 0) {
- op->p.p_modrdn.modrdn_newsuperior_address.sdn = slapi_sdn_new_dn_byval(value.bv_val);
- } else if (strncasecmp(type.bv_val, T_NEWSUPERIORIDSTR, type.bv_len) == 0) {
- op->p.p_modrdn.modrdn_newsuperior_address.uniqueid = slapi_ch_strdup(value.bv_val);
- } else if (strncasecmp(type.bv_val, T_CHANGESTR,
- strlen(T_CHANGESTR) > type.bv_len ? strlen(T_CHANGESTR) : type.bv_len) == 0) {
- PR_ASSERT(op->operation_type);
- switch (op->operation_type) {
- case SLAPI_OPERATION_ADD:
- /*
- * When it comes here, case T_DNSTR is already
- * passed and rawDN is supposed to set.
- * But it's a good idea to make sure it is
- * not NULL.
- */
- if (NULL == rawDN) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5LDIF2Operation - corrupted format "
- "for operation type - %lu\n",
- op->operation_type);
- slapi_ch_free_string(&ldifEntryWork);
- return CL5_BAD_FORMAT;
- }
- mods = parse_changes_string(value.bv_val);
- PR_ASSERT(mods);
- slapi_mods2entry(&(op->p.p_add.target_entry), rawDN,
- slapi_mods_get_ldapmods_byref(mods));
- slapi_ch_free((void **)&rawDN);
- slapi_mods_free(&mods);
- break;
- case SLAPI_OPERATION_MODIFY:
- mods = parse_changes_string(value.bv_val);
- PR_ASSERT(mods);
- op->p.p_modify.modify_mods = slapi_mods_get_ldapmods_passout(mods);
- slapi_mods_free(&mods);
- break;
- case SLAPI_OPERATION_MODRDN:
- mods = parse_changes_string(value.bv_val);
- PR_ASSERT(mods);
- op->p.p_modrdn.modrdn_mods = slapi_mods_get_ldapmods_passout(mods);
- slapi_mods_free(&mods);
- break;
- default:
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5LDIF2Operation - Invalid operation type - %lu\n",
- op->operation_type);
- if (freeval) {
- slapi_ch_free_string(&value.bv_val);
- }
- slapi_ch_free_string(&ldifEntryWork);
- return CL5_BAD_FORMAT;
- }
- }
- if (freeval) {
- slapi_ch_free_string(&value.bv_val);
- }
- }
- if ((0 != strncmp(ldifEntryWork, "clpurgeruv", 10)) && /* skip RUV; */
- (0 != strncmp(ldifEntryWork, "clmaxruv", 8))) /* RUV has NULL op */
- {
- if (IsValidOperation(op)) {
- rval = CL5_SUCCESS;
- } else {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5LDIF2Operation - Invalid data format\n");
- }
- }
- slapi_ch_free_string(&ldifEntryWork);
- return rval;
- }
- static int
- _cl5WriteOperationTxn(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local __attribute__((unused)), void *txn)
- {
- int rc;
- int cnt;
- DBT key = {0};
- DBT *data = NULL;
- char csnStr[CSN_STRSIZE];
- PRIntervalTime interval;
- CL5Entry entry;
- CL5DBFile *file = NULL;
- Object *file_obj = NULL;
- DB_TXN *txnid = NULL;
- DB_TXN *parent_txnid = (DB_TXN *)txn;
- rc = _cl5GetDBFileByReplicaName(replName, replGen, &file_obj);
- if (rc == CL5_NOTFOUND) {
- rc = _cl5DBOpenFileByReplicaName(replName, replGen, &file_obj,
- PR_TRUE /* check for duplicates */);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5WriteOperationTxn - Failed to find or open DB object for replica %s\n", replName);
- return rc;
- }
- } else if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5WriteOperationTxn - Failed to get db file for target dn (%s)",
- REPL_GET_DN(&op->target_address));
- return CL5_OBJSET_ERROR;
- }
- /* assign entry time - used for trimming */
- entry.time = slapi_current_utc_time();
- entry.op = (slapi_operation_parameters *)op;
- /* construct the key */
- key.data = csn_as_string(op->csn, PR_FALSE, csnStr);
- key.size = CSN_STRSIZE;
- /* construct the data */
- data = (DBT *)slapi_ch_calloc(1, sizeof(DBT));
- rc = _cl5Entry2DBData(&entry, (char **)&data->data, &data->size);
- if (rc != CL5_SUCCESS) {
- char s[CSN_STRSIZE];
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5WriteOperationTxn - Failed to convert entry with csn (%s) "
- "to db format\n",
- csn_as_string(op->csn, PR_FALSE, s));
- goto done;
- }
- file = (CL5DBFile *)object_get_data(file_obj);
- PR_ASSERT(file);
- /* if this is part of ldif2cl - just write the entry without transaction */
- if (s_cl5Desc.dbOpenMode == CL5_OPEN_LDIF2CL) {
- rc = file->db->put(file->db, NULL, &key, data, 0);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5WriteOperationTxn - Failed to write entry; db error - %d %s\n",
- rc, db_strerror(rc));
- if (CL5_OS_ERR_IS_DISKFULL(rc)) {
- cl5_set_diskfull();
- }
- rc = CL5_DB_ERROR;
- }
- goto done;
- }
- /* write the entry */
- rc = EAGAIN;
- cnt = 0;
- while ((rc == EAGAIN || rc == DB_LOCK_DEADLOCK) && cnt < MAX_TRIALS) {
- if (cnt != 0) {
- /* abort previous transaction */
- rc = TXN_ABORT(txnid);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5WriteOperationTxn - Failed to abort transaction; db error - %d %s\n",
- rc, db_strerror(rc));
- rc = CL5_DB_ERROR;
- goto done;
- }
- /* back off */
- interval = PR_MillisecondsToInterval(slapi_rand() % 100);
- DS_Sleep(interval);
- }
- /* begin transaction */
- rc = TXN_BEGIN(s_cl5Desc.dbEnv, parent_txnid, &txnid, 0);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5WriteOperationTxn - Failed to start transaction; db error - %d %s\n",
- rc, db_strerror(rc));
- rc = CL5_DB_ERROR;
- goto done;
- }
- rc = file->db->put(file->db, txnid, &key, data, 0);
- if (CL5_OS_ERR_IS_DISKFULL(rc)) {
- slapi_log_err(SLAPI_LOG_CRIT, repl_plugin_name_cl,
- "_cl5WriteOperationTxn - Changelog (%s) DISK FULL; db error - %d %s\n",
- s_cl5Desc.dbDir, rc, db_strerror(rc));
- cl5_set_diskfull();
- rc = CL5_DB_ERROR;
- goto done;
- }
- if (cnt != 0) {
- if (rc == 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "_cl5WriteOperationTxn - "
- "retry (%d) the transaction (csn=%s) succeeded\n",
- cnt, (char *)key.data);
- } else if ((cnt + 1) >= MAX_TRIALS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl, "_cl5WriteOperationTxn - "
- "retry (%d) the transaction (csn=%s) failed (rc=%d (%s))\n",
- cnt, (char *)key.data, rc, db_strerror(rc));
- }
- }
- cnt++;
- }
- if (rc == 0) /* we successfully added entry */
- {
- rc = TXN_COMMIT(txnid);
- } else {
- char s[CSN_STRSIZE];
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5WriteOperationTxn - Failed to write entry with csn (%s); "
- "db error - %d %s\n",
- csn_as_string(op->csn, PR_FALSE, s),
- rc, db_strerror(rc));
- rc = TXN_ABORT(txnid);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5WriteOperationTxn - Failed to abort transaction; db error - %d %s\n",
- rc, db_strerror(rc));
- }
- rc = CL5_DB_ERROR;
- goto done;
- }
- /* update entry count - we assume that all entries are new */
- PR_AtomicIncrement(&file->entryCount);
- /* update purge vector if we have not seen any changes from this replica before */
- _cl5UpdateRUV(file_obj, op->csn, PR_TRUE, PR_TRUE);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5WriteOperationTxn - Successfully written entry with csn (%s)\n", csnStr);
- rc = CL5_SUCCESS;
- done:
- if (data->data)
- slapi_ch_free(&(data->data));
- slapi_ch_free((void **)&data);
- if (file_obj)
- object_release(file_obj);
- return rc;
- }
- static int
- _cl5WriteOperation(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local)
- {
- return _cl5WriteOperationTxn(replName, replGen, op, local, NULL);
- }
- static int
- _cl5GetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid)
- {
- int rc;
- DBC *cursor = NULL;
- DBT key = {0}, data = {0};
- CL5Iterator *it;
- CL5DBFile *file;
- PR_ASSERT(obj && entry && iterator);
- file = (CL5DBFile *)object_get_data(obj);
- PR_ASSERT(file);
- /* create cursor */
- rc = file->db->cursor(file->db, txnid, &cursor, 0);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5GetFirstEntry - Failed to create cursor; db error - %d %s\n", rc, db_strerror(rc));
- rc = CL5_DB_ERROR;
- goto done;
- }
- key.flags = DB_DBT_MALLOC;
- data.flags = DB_DBT_MALLOC;
- while ((rc = cursor->c_get(cursor, &key, &data, DB_NEXT)) == 0) {
- /* skip service entries */
- if (cl5HelperEntry((char *)key.data, NULL)) {
- slapi_ch_free(&(key.data));
- slapi_ch_free(&(data.data));
- continue;
- }
- /* format entry */
- slapi_ch_free(&(key.data));
- rc = cl5DBData2Entry(data.data, data.size, entry);
- slapi_ch_free(&(data.data));
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5GetFirstOperation - Failed to format entry: %d\n", rc);
- goto done;
- }
- it = (CL5Iterator *)slapi_ch_malloc(sizeof(CL5Iterator));
- it->cursor = cursor;
- object_acquire(obj);
- it->file = obj;
- *(CL5Iterator **)iterator = it;
- return CL5_SUCCESS;
- }
- /*
- * Bug 430172 - memory leaks after db "get" deadlocks, e.g. in CL5 trim
- * Even when db->c_get() does not return success, memory may have been
- * allocated in the DBT. This seems to happen when DB_DBT_MALLOC was set,
- * the data being retrieved is larger than the page size, and we got
- * DB_LOCK_DEADLOCK. libdb allocates the memory and then finds itself
- * deadlocked trying to go through the overflow page list. It returns
- * DB_LOCK_DEADLOCK which we've assumed meant that no memory was allocated
- * for the DBT.
- *
- * The following slapi_ch_free frees the memory only when the value is
- * non NULL, which is true if the situation described above occurs.
- */
- slapi_ch_free((void **)&key.data);
- slapi_ch_free((void **)&data.data);
- /* walked of the end of the file */
- if (rc == DB_NOTFOUND) {
- rc = CL5_NOTFOUND;
- goto done;
- }
- /* db error occured while iterating */
- /* On this path, the condition "rc != 0" cannot be false */
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5GetFirstEntry - Failed to get entry; db error - %d %s\n",
- rc, db_strerror(rc));
- rc = CL5_DB_ERROR;
- done:
- /* error occured */
- /* We didn't success in assigning this cursor to the iterator,
- * so we need to free the cursor here */
- if (cursor)
- cursor->c_close(cursor);
- return rc;
- }
- static int
- _cl5GetNextEntry(CL5Entry *entry, void *iterator)
- {
- int rc;
- CL5Iterator *it;
- DBT key = {0}, data = {0};
- PR_ASSERT(entry && iterator);
- it = (CL5Iterator *)iterator;
- key.flags = DB_DBT_MALLOC;
- data.flags = DB_DBT_MALLOC;
- while ((rc = it->cursor->c_get(it->cursor, &key, &data, DB_NEXT)) == 0) {
- if (cl5HelperEntry((char *)key.data, NULL)) {
- slapi_ch_free(&(key.data));
- slapi_ch_free(&(data.data));
- continue;
- }
- slapi_ch_free(&(key.data));
- /* format entry */
- rc = cl5DBData2Entry(data.data, data.size, entry);
- slapi_ch_free(&(data.data));
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5GetNextEntry - Failed to format entry: %d\n", rc);
- }
- return rc;
- }
- /*
- * Bug 430172 - memory leaks after db "get" deadlocks, e.g. in CL5 trim
- * Even when db->c_get() does not return success, memory may have been
- * allocated in the DBT. This seems to happen when DB_DBT_MALLOC was set,
- * the data being retrieved is larger than the page size, and we got
- * DB_LOCK_DEADLOCK. libdb allocates the memory and then finds itself
- * deadlocked trying to go through the overflow page list. It returns
- * DB_LOCK_DEADLOCK which we've assumed meant that no memory was allocated
- * for the DBT.
- *
- * The following slapi_ch_free frees the memory only when the value is
- * non NULL, which is true if the situation described above occurs.
- */
- slapi_ch_free((void **)&key.data);
- slapi_ch_free((void **)&data.data);
- /* walked of the end of the file or entry is out of range */
- if (rc == 0 || rc == DB_NOTFOUND) {
- return CL5_NOTFOUND;
- }
- /* cursor operation failed */
- slapi_log_err(rc == CL5_DB_LOCK_ERROR ? SLAPI_LOG_REPL : SLAPI_LOG_ERR,
- repl_plugin_name_cl,
- "_cl5GetNextEntry - Failed to get entry; db error - %d %s\n",
- rc, db_strerror(rc));
- return rc;
- }
- static int
- _cl5CurrentDeleteEntry(void *iterator)
- {
- int rc;
- CL5Iterator *it;
- CL5DBFile *file;
- PR_ASSERT(iterator);
- it = (CL5Iterator *)iterator;
- rc = it->cursor->c_del(it->cursor, 0);
- if (rc == 0) {
- /* decrement entry count */
- file = (CL5DBFile *)object_get_data(it->file);
- PR_AtomicDecrement(&file->entryCount);
- return CL5_SUCCESS;
- } else {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5CurrentDeleteEntry - Failed, err=%d %s\n",
- rc, db_strerror(rc));
- /*
- * We don't free(close) the cursor here, as the caller will free it by
- * a call to cl5DestroyIterator. Freeing it here is a potential bug,
- * as the cursor can't be referenced later once freed.
- */
- return rc;
- }
- }
- static PRBool
- _cl5IsValidIterator(const CL5Iterator *iterator)
- {
- return (iterator && iterator->cursor && iterator->file);
- }
- static int
- _cl5GetOperation(Object *replica, slapi_operation_parameters *op)
- {
- int rc;
- DBT key = {0}, data = {0};
- CL5DBFile *file;
- CL5Entry entry;
- Object *obj = NULL;
- char csnStr[CSN_STRSIZE];
- rc = _cl5GetDBFile(replica, &obj);
- if (rc != CL5_SUCCESS || !obj) {
- goto done;
- }
- file = (CL5DBFile *)object_get_data(obj);
- PR_ASSERT(file);
- /* construct the key */
- key.data = csn_as_string(op->csn, PR_FALSE, csnStr);
- key.size = CSN_STRSIZE;
- data.flags = DB_DBT_MALLOC;
- rc = file->db->get(file->db, NULL /*txn*/, &key, &data, 0);
- switch (rc) {
- case 0:
- entry.op = op;
- /* Callers of this function should cl5_operation_parameters_done(op) */
- rc = cl5DBData2Entry(data.data, data.size, &entry);
- if (rc == CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5GetOperation - Successfully retrieved operation with csn (%s)\n",
- csnStr);
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5GetOperation - Failed to convert db data to operation;"
- " csn - %s\n",
- csnStr);
- }
- goto done;
- case DB_NOTFOUND:
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5GetOperation - Operation for csn (%s) is not found in db that should contain dn (%s)\n",
- csnStr, REPL_GET_DN(&op->target_address));
- rc = CL5_NOTFOUND;
- goto done;
- default:
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5GetOperation - Failed to get entry for csn (%s); "
- "db error - %d %s\n",
- csnStr, rc, db_strerror(rc));
- rc = CL5_DB_ERROR;
- goto done;
- }
- done:
- if (obj)
- object_release(obj);
- slapi_ch_free(&(data.data));
- return rc;
- }
- PRBool
- cl5HelperEntry(const char *csnstr, CSN *csnp)
- {
- CSN *csn;
- time_t csnTime;
- PRBool retval = PR_FALSE;
- if (csnp) {
- csn = csnp;
- } else {
- csn = csn_new_by_string(csnstr);
- }
- if (csn == NULL) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5HelperEntry - Failed to get csn time; csn error\n");
- return PR_FALSE;
- }
- csnTime = csn_get_time(csn);
- if (csnTime == ENTRY_COUNT_TIME || csnTime == PURGE_RUV_TIME) {
- retval = PR_TRUE;
- }
- if (NULL == csnp)
- csn_free(&csn);
- return retval;
- }
- #ifdef FOR_DEBUGGING
- /* Replay iteration helper functions */
- static PRBool
- _cl5ValidReplayIterator(const CL5ReplayIterator *iterator)
- {
- if (iterator == NULL ||
- iterator->consumerRuv == NULL || iterator->supplierRuvObj == NULL ||
- iterator->fileObj == NULL)
- return PR_FALSE;
- return PR_TRUE;
- }
- #endif
- /* Algorithm: ONREPL!!!
- */
- struct replica_hash_entry
- {
- ReplicaId rid; /* replica id */
- PRBool sendChanges; /* indicates whether changes should be sent for this replica */
- };
- static int
- _cl5PositionCursorForReplay(ReplicaId consumerRID, const RUV *consumerRuv, Object *replica, Object *fileObj, CL5ReplayIterator **iterator, int *continue_on_missing)
- {
- CLC_Buffer *clcache = NULL;
- CL5DBFile *file;
- CSN *startCSN = NULL;
- char csnStr[CSN_STRSIZE];
- int rc = CL5_SUCCESS;
- Object *supplierRuvObj = NULL;
- RUV *supplierRuv = NULL;
- PRBool haveChanges = PR_FALSE;
- char *agmt_name;
- PR_ASSERT(consumerRuv && replica && fileObj && iterator);
- csnStr[0] = '\0';
- file = (CL5DBFile *)object_get_data(fileObj);
- /* get supplier's RUV */
- supplierRuvObj = replica_get_ruv((Replica *)object_get_data(replica));
- PR_ASSERT(supplierRuvObj);
- if (!supplierRuvObj) {
- rc = CL5_UNKNOWN_ERROR;
- goto done;
- }
- supplierRuv = (RUV *)object_get_data(supplierRuvObj);
- PR_ASSERT(supplierRuv);
- agmt_name = get_thread_private_agmtname();
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5PositionCursorForReplay - (%s): Consumer RUV:\n", agmt_name);
- ruv_dump(consumerRuv, agmt_name, NULL);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5PositionCursorForReplay - (%s): Supplier RUV:\n", agmt_name);
- ruv_dump(supplierRuv, agmt_name, NULL);
- }
- /* initialize the changelog buffer and do the initial load */
- rc = clcache_get_buffer(&clcache, file->db, consumerRID, consumerRuv, supplierRuv);
- if (rc != 0)
- goto done;
- rc = clcache_load_buffer(clcache, &startCSN, continue_on_missing);
- if (rc == 0) {
- haveChanges = PR_TRUE;
- rc = CL5_SUCCESS;
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- csn_as_string(startCSN, PR_FALSE, csnStr);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
- }
- } else if (rc == DB_NOTFOUND) {
- /* buffer not loaded.
- * either because no changes have to be sent ==> startCSN is NULL
- * or the calculated startCSN cannot be found in the changelog
- */
- if (startCSN == NULL) {
- rc = CL5_NOTFOUND;
- goto done;
- }
- /* check whether this csn should be present */
- rc = _cl5CheckMissingCSN(startCSN, supplierRuv, file);
- if (rc == CL5_MISSING_DATA) /* we should have had the change but we don't */
- {
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- csn_as_string(startCSN, PR_FALSE, csnStr);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "repl_plugin_name_cl - %s: CSN %s not found, seems to be missing\n", agmt_name, csnStr);
- }
- } else /* we are not as up to date or we purged */
- {
- csn_as_string(startCSN, PR_FALSE, csnStr);
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "repl_plugin_name_cl - %s: CSN %s not found, we aren't as up to date, or we purged\n",
- agmt_name, csnStr);
- }
- } else {
- csn_as_string(startCSN, PR_FALSE, csnStr);
- /* db error */
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "repl_plugin_name_cl - %s: Failed to retrieve change with CSN %s; db error - %d %s\n",
- agmt_name, csnStr, rc, db_strerror(rc));
- rc = CL5_DB_ERROR;
- }
- /* setup the iterator */
- if (haveChanges) {
- *iterator = (CL5ReplayIterator *)slapi_ch_calloc(1, sizeof(CL5ReplayIterator));
- if (*iterator == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5PositionCursorForReplay - %s - Failed to allocate iterator\n", agmt_name);
- rc = CL5_MEMORY_ERROR;
- goto done;
- }
- /* ONREPL - should we make a copy of both RUVs here ?*/
- (*iterator)->fileObj = fileObj;
- (*iterator)->clcache = clcache;
- clcache = NULL;
- (*iterator)->consumerRID = consumerRID;
- (*iterator)->consumerRuv = consumerRuv;
- (*iterator)->supplierRuvObj = supplierRuvObj;
- } else if (rc == CL5_SUCCESS) {
- /* we have no changes to send */
- rc = CL5_NOTFOUND;
- }
- done:
- if (clcache)
- clcache_return_buffer(&clcache);
- if (rc != CL5_SUCCESS) {
- if (supplierRuvObj)
- object_release(supplierRuvObj);
- }
- return rc;
- }
- struct ruv_it
- {
- CSN **csns; /* csn list */
- int alloc; /* allocated size */
- int pos; /* position in the list */
- };
- static int
- ruv_consumer_iterator(const ruv_enum_data *enum_data, void *arg)
- {
- struct ruv_it *data = (struct ruv_it *)arg;
- PR_ASSERT(data);
- /* check if we have space for one more element */
- if (data->pos >= data->alloc - 2) {
- data->alloc += 4;
- data->csns = (CSN **)slapi_ch_realloc((void *)data->csns, data->alloc * sizeof(CSN *));
- }
- data->csns[data->pos] = csn_dup(enum_data->csn);
- data->pos++;
- return 0;
- }
- static int
- ruv_supplier_iterator(const ruv_enum_data *enum_data, void *arg)
- {
- int i;
- PRBool found = PR_FALSE;
- ReplicaId rid;
- struct ruv_it *data = (struct ruv_it *)arg;
- PR_ASSERT(data);
- rid = csn_get_replicaid(enum_data->min_csn);
- /* check if the replica that generated the csn is already in the list */
- for (i = 0; i < data->pos; i++) {
- if (rid == csn_get_replicaid(data->csns[i])) {
- found = PR_TRUE;
- /* remove datacsn[i] if it is greater or equal to the supplier's maxcsn */
- if (csn_compare(data->csns[i], enum_data->csn) >= 0) {
- int j;
- csn_free(&data->csns[i]);
- for (j = i + 1; j < data->pos; j++) {
- data->csns[j - 1] = data->csns[j];
- }
- data->pos--;
- }
- break;
- }
- }
- if (!found) {
- /* check if we have space for one more element */
- if (data->pos >= data->alloc - 2) {
- data->alloc += 4;
- data->csns = (CSN **)slapi_ch_realloc((void *)data->csns,
- data->alloc * sizeof(CSN *));
- }
- data->csns[data->pos] = csn_dup(enum_data->min_csn);
- data->pos++;
- }
- return 0;
- }
- static int
- my_csn_compare(const void *arg1, const void *arg2)
- {
- return (csn_compare(*((CSN **)arg1), *((CSN **)arg2)));
- }
- /* builds CSN ordered list of all csns in the RUV */
- CSN **
- cl5BuildCSNList(const RUV *consRuv, const RUV *supRuv)
- {
- struct ruv_it data;
- int count, rc;
- CSN **csns;
- PR_ASSERT(consRuv);
- count = ruv_replica_count(consRuv);
- csns = (CSN **)slapi_ch_calloc(count + 1, sizeof(CSN *));
- data.csns = csns;
- data.alloc = count + 1;
- data.pos = 0;
- /* add consumer elements to the list */
- rc = ruv_enumerate_elements(consRuv, ruv_consumer_iterator, &data);
- if (rc == 0 && supRuv) {
- /* add supplier elements to the list */
- rc = ruv_enumerate_elements(supRuv, ruv_supplier_iterator, &data);
- }
- /* we have no csns */
- if (data.csns[0] == NULL) {
- /* csns might have been realloced in ruv_supplier_iterator() */
- slapi_ch_free((void **)&data.csns);
- csns = NULL;
- } else {
- csns = data.csns;
- data.csns[data.pos] = NULL;
- if (rc == 0) {
- qsort(csns, data.pos, sizeof(CSN *), my_csn_compare);
- } else {
- cl5DestroyCSNList(&csns);
- }
- }
- return csns;
- }
- void
- cl5DestroyCSNList(CSN ***csns)
- {
- if (csns && *csns) {
- int i;
- for (i = 0; (*csns)[i]; i++) {
- csn_free(&(*csns)[i]);
- }
- slapi_ch_free((void **)csns);
- }
- }
- /* A csn should be in the changelog if it is larger than purge vector csn for the same
- replica and is smaller than the csn in supplier's ruv for the same replica.
- The functions returns
- CL5_PURGED if data was purged from the changelog or was never logged
- because it was loaded as part of replica initialization
- CL5_MISSING if the data erouneously missing
- CL5_SUCCESS if that has not and should not been seen by the server
- */
- static int
- _cl5CheckMissingCSN(const CSN *csn, const RUV *supplierRuv, CL5DBFile *file)
- {
- ReplicaId rid;
- CSN *supplierCsn = NULL;
- CSN *purgeCsn = NULL;
- int rc = CL5_SUCCESS;
- char csnStr[CSN_STRSIZE];
- PR_ASSERT(csn && supplierRuv && file);
- rid = csn_get_replicaid(csn);
- ruv_get_largest_csn_for_replica(supplierRuv, rid, &supplierCsn);
- if (supplierCsn == NULL) {
- /* we have not seen any changes from this replica so it is
- ok not to have this csn */
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN - "
- "can't locate %s csn: we have not seen any changes for replica %d\n",
- csn_as_string(csn, PR_FALSE, csnStr), rid);
- }
- return CL5_SUCCESS;
- }
- ruv_get_largest_csn_for_replica(file->purgeRUV, rid, &purgeCsn);
- if (purgeCsn == NULL) {
- /* changelog never contained any changes for this replica */
- if (csn_compare(csn, supplierCsn) <= 0) {
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN - "
- "the change with %s csn was never logged because it was imported "
- "during replica initialization\n",
- csn_as_string(csn, PR_FALSE, csnStr));
- }
- rc = CL5_PURGED_DATA; /* XXXggood is that the correct return value? */
- } else {
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN - "
- "change with %s csn has not yet been seen by this server; "
- " last csn seen from that replica is %s\n",
- csn_as_string(csn, PR_FALSE, csnStr),
- csn_as_string(supplierCsn, PR_FALSE, csnStr));
- }
- rc = CL5_SUCCESS;
- }
- } else /* we have both purge and supplier csn */
- {
- if (csn_compare(csn, purgeCsn) < 0) /* the csn is below the purge point */
- {
- rc = CL5_PURGED_DATA;
- } else {
- if (csn_compare(csn, supplierCsn) <= 0) /* we should have the data but we don't */
- {
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN - "
- "change with %s csn has been purged by this server; "
- "the current purge point for that replica is %s\n",
- csn_as_string(csn, PR_FALSE, csnStr),
- csn_as_string(purgeCsn, PR_FALSE, csnStr));
- }
- rc = CL5_MISSING_DATA;
- } else {
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN - "
- "change with %s csn has not yet been seen by this server; "
- " last csn seen from that replica is %s\n",
- csn_as_string(csn, PR_FALSE, csnStr),
- csn_as_string(supplierCsn, PR_FALSE, csnStr));
- }
- rc = CL5_SUCCESS;
- }
- }
- }
- if (supplierCsn)
- csn_free(&supplierCsn);
- if (purgeCsn)
- csn_free(&purgeCsn);
- return rc;
- }
- /* Helper functions that work with individual changelog files */
- /* file name format : <replica name>_<replica generation>db{2,3,...} */
- static PRBool
- _cl5FileName2Replica(const char *file_name, Object **replica)
- {
- Replica *r;
- char *repl_name, *file_gen, *repl_gen;
- int len;
- PR_ASSERT(file_name && replica);
- *replica = NULL;
- /* this is database file */
- if (_cl5FileEndsWith(file_name, DB_EXTENSION) ||
- _cl5FileEndsWith(file_name, DB_EXTENSION_DB4) ||
- _cl5FileEndsWith(file_name, DB_EXTENSION_DB3)) {
- repl_name = slapi_ch_strdup(file_name);
- file_gen = strstr(repl_name, FILE_SEP);
- if (file_gen) {
- int extlen = strlen(DB_EXTENSION);
- *file_gen = '\0';
- file_gen += strlen(FILE_SEP);
- len = strlen(file_gen);
- if (len <= extlen + 1) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5FileName2Replica - "
- "Invalid file name (%s)\n",
- file_name);
- } else {
- /* get rid of the file extension */
- file_gen[len - extlen - 1] = '\0';
- *replica = replica_get_by_name(repl_name);
- if (*replica) {
- /* check that generation matches the one in replica object */
- r = (Replica *)object_get_data(*replica);
- repl_gen = replica_get_generation(r);
- PR_ASSERT(repl_gen);
- if (strcmp(file_gen, repl_gen) != 0) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5FileName2Replica - "
- "Replica generation mismatch for replica at (%s), "
- "file generation %s, new replica generation %s\n",
- slapi_sdn_get_dn(replica_get_root(r)), file_gen, repl_gen);
- object_release(*replica);
- *replica = NULL;
- }
- slapi_ch_free((void **)&repl_gen);
- }
- }
- slapi_ch_free((void **)&repl_name);
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5FileName2Replica - "
- "Malformed file name - %s\n",
- file_name);
- }
- return PR_TRUE;
- } else
- return PR_FALSE;
- }
- /* file name format : <replica name>_<replica generation>db{2,3} */
- static char *
- _cl5Replica2FileName(Object *replica)
- {
- const char *replName;
- char *replGen, *fileName;
- Replica *r;
- PR_ASSERT(replica);
- r = (Replica *)object_get_data(replica);
- PR_ASSERT(r);
- replName = replica_get_name(r);
- replGen = replica_get_generation(r);
- fileName = _cl5MakeFileName(replName, replGen);
- slapi_ch_free((void **)&replGen);
- return fileName;
- }
- static char *
- _cl5MakeFileName(const char *replName, const char *replGen)
- {
- char *fileName = slapi_ch_smprintf("%s/%s%s%s.%s",
- s_cl5Desc.dbDir, replName,
- FILE_SEP, replGen, DB_EXTENSION);
- return fileName;
- }
- /* open file that corresponds to a particular database */
- static int
- _cl5DBOpenFile(Object *replica, Object **obj, PRBool checkDups)
- {
- int rc;
- const char *replName;
- char *replGen;
- Replica *r;
- PR_ASSERT(replica);
- r = (Replica *)object_get_data(replica);
- replName = replica_get_name(r);
- PR_ASSERT(replName);
- replGen = replica_get_generation(r);
- PR_ASSERT(replGen);
- rc = _cl5DBOpenFileByReplicaName(replName, replGen, obj, checkDups);
- slapi_ch_free((void **)&replGen);
- return rc;
- }
- static int
- _cl5DBOpenFileByReplicaName(const char *replName, const char *replGen, Object **obj, PRBool checkDups)
- {
- int rc = CL5_SUCCESS;
- Object *tmpObj;
- CL5DBFile *file;
- char *file_name;
- PR_ASSERT(replName && replGen);
- if (checkDups) {
- PR_Lock(s_cl5Desc.fileLock);
- file_name = _cl5MakeFileName(replName, replGen);
- tmpObj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, file_name);
- slapi_ch_free((void **)&file_name);
- if (tmpObj) /* this file already exist */
- {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DBOpenFileByReplicaName - Found DB object %p for replica %s\n", tmpObj, replName);
- /* if we were asked for file handle - keep the handle */
- if (obj) {
- *obj = tmpObj;
- } else {
- object_release(tmpObj);
- }
- rc = CL5_SUCCESS;
- goto done;
- }
- }
- rc = _cl5NewDBFile(replName, replGen, &file);
- if (rc == CL5_SUCCESS) {
- /* This creates the file but doesn't set the init flag
- * The flag is set later when the purge and max ruvs are set.
- * This is to prevent some thread to get file access before the
- * structure is fully initialized */
- rc = _cl5AddDBFile(file, &tmpObj);
- if (rc == CL5_SUCCESS) {
- /* read purge RUV - done here because it needs file object rather than file pointer */
- rc = _cl5ReadRUV(replGen, tmpObj, PR_TRUE);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DBOpenFileByReplicaName - Failed to get purge RUV\n");
- goto done;
- }
- /* read ruv that represents the upper bound of the changes stored in the file */
- rc = _cl5ReadRUV(replGen, tmpObj, PR_FALSE);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DBOpenFileByReplicaName - Failed to get upper bound RUV\n");
- goto done;
- }
- /* Mark the DB File initialize */
- _cl5DBFileInitialized(tmpObj);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DBOpenFileByReplicaName - Created new DB object %p\n", tmpObj);
- if (obj) {
- *obj = tmpObj;
- } else {
- object_release(tmpObj);
- }
- }
- }
- done:;
- if (rc != CL5_SUCCESS) {
- if (file)
- _cl5DBCloseFile((void **)&file);
- }
- if (checkDups) {
- PR_Unlock(s_cl5Desc.fileLock);
- }
- return rc;
- }
- /* adds file to the db file list */
- static int
- _cl5AddDBFile(CL5DBFile *file, Object **obj)
- {
- int rc;
- Object *tmpObj;
- PR_ASSERT(file);
- tmpObj = object_new(file, _cl5DBCloseFile);
- rc = objset_add_obj(s_cl5Desc.dbFiles, tmpObj);
- if (rc != OBJSET_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5AddDBFile - Failed to add db file to the list; "
- "repl_objset error - %d\n",
- rc);
- object_release(tmpObj);
- return CL5_OBJSET_ERROR;
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5AddDBFile - Added new DB object %p\n", tmpObj);
- }
- if (obj) {
- *obj = tmpObj;
- } else
- object_release(tmpObj);
- return CL5_SUCCESS;
- }
- static int
- _cl5NewDBFile(const char *replName, const char *replGen, CL5DBFile **dbFile)
- {
- int rc;
- DB *db = NULL;
- char *name;
- #ifdef HPUX
- char cwd[PATH_MAX + 1];
- #endif
- PR_ASSERT(replName && replGen && dbFile);
- if (!dbFile) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5NewDBFile - NULL dbFile\n");
- return CL5_UNKNOWN_ERROR;
- }
- (*dbFile) = (CL5DBFile *)slapi_ch_calloc(1, sizeof(CL5DBFile));
- if (*dbFile == NULL) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5NewDBFile - memory allocation failed\n");
- return CL5_MEMORY_ERROR;
- }
- name = _cl5MakeFileName(replName, replGen);
- {
- /* The subname argument allows applications to have
- * subdatabases, i.e., multiple databases inside of a single
- * physical file. This is useful when the logical databases
- * are both numerous and reasonably small, in order to
- * avoid creating a large number of underlying files.
- */
- char *subname = NULL;
- DB_ENV *dbEnv = s_cl5Desc.dbEnv;
- rc = db_create(&db, dbEnv, 0);
- if (0 != rc) {
- goto out;
- }
- rc = db->set_pagesize(
- db,
- s_cl5Desc.dbConfig.pageSize);
- if (0 != rc) {
- goto out;
- }
- DB_OPEN(s_cl5Desc.dbEnvOpenFlags,
- db, NULL /* txnid */, name, subname, DB_BTREE,
- DB_CREATE | DB_THREAD, s_cl5Desc.dbConfig.fileMode, rc);
- }
- out:
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5NewDBFile - db_open failed; db error - %d %s\n",
- rc, db_strerror(rc));
- rc = CL5_DB_ERROR;
- goto done;
- }
- (*dbFile)->db = db;
- (*dbFile)->name = name;
- name = NULL; /* transfer ownership to dbFile struct */
- (*dbFile)->replName = slapi_ch_strdup(replName);
- (*dbFile)->replGen = slapi_ch_strdup(replGen);
- /* compute number of entries in the file */
- /* ONREPL - to improve performance, we keep entry count in memory
- and write it down during shutdown. Problem: this will not
- work with multiple processes. Do we have to worry about that?
- */
- if (s_cl5Desc.dbOpenMode == CL5_OPEN_NORMAL) {
- rc = _cl5GetEntryCount(*dbFile);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5NewDBFile - Failed to get entry count\n");
- goto done;
- }
- }
- done:
- if (rc != CL5_SUCCESS) {
- _cl5DBCloseFile((void **)dbFile);
- /* slapi_ch_free accepts NULL pointer */
- slapi_ch_free((void **)&name);
- slapi_ch_free((void **)dbFile);
- }
- return rc;
- }
- static void
- _cl5DBCloseFile(void **data)
- {
- CL5DBFile *file;
- int rc = 0;
- PR_ASSERT(data);
- file = *(CL5DBFile **)data;
- PR_ASSERT(file);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBCloseFile - "
- "Closing database %s\n",
- file->name);
- /* close the file */
- /* if this is normal close or close after import, update entry count */
- if ((s_cl5Desc.dbOpenMode == CL5_OPEN_NORMAL && s_cl5Desc.dbState == CL5_STATE_CLOSING) ||
- s_cl5Desc.dbOpenMode == CL5_OPEN_LDIF2CL) {
- _cl5WriteEntryCount(file);
- _cl5WriteRUV(file, PR_TRUE);
- _cl5WriteRUV(file, PR_FALSE);
- }
- /* close the db */
- if (file->db) {
- rc = file->db->close(file->db, 0);
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "_cl5DBCloseFile - "
- "Closed the changelog database handle for %s "
- "(rc: %d)\n",
- file->name, rc);
- file->db = NULL;
- }
- if (file->flags & DB_FILE_DELETED) {
- /* We need to use the libdb API to delete the files, otherwise we'll
- * run into problems when we try to checkpoint transactions later. */
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBCloseFile - "
- "removing the changelog %s (flag %d)\n",
- file->name, DEFAULT_DB_ENV_OP_FLAGS);
- rc = s_cl5Desc.dbEnv->dbremove(s_cl5Desc.dbEnv, 0, file->name, 0,
- DEFAULT_DB_ENV_OP_FLAGS);
- if (rc != 0) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBCloseFile - "
- "failed to remove (%s) file; libdb error - %d (%s)\n",
- file->name, rc, db_strerror(rc));
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBCloseFile - "
- "Deleted the changelog database file %s\n",
- file->name);
- }
- }
- /* slapi_ch_free accepts NULL pointer */
- slapi_ch_free((void **)&file->name);
- slapi_ch_free((void **)&file->replName);
- slapi_ch_free((void **)&file->replGen);
- ruv_destroy(&file->maxRUV);
- ruv_destroy(&file->purgeRUV);
- file->db = NULL;
- slapi_ch_free(data);
- }
- static int
- _cl5GetDBFile(Object *replica, Object **obj)
- {
- char *fileName;
- PR_ASSERT(replica && obj);
- fileName = _cl5Replica2FileName(replica);
- *obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
- if (*obj) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetDBFile - "
- "found DB object %p for database %s\n",
- *obj, fileName);
- slapi_ch_free_string(&fileName);
- return CL5_SUCCESS;
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetDBFile - "
- "no DB object found for database %s\n",
- fileName);
- slapi_ch_free_string(&fileName);
- return CL5_NOTFOUND;
- }
- }
- static int
- _cl5GetDBFileByReplicaName(const char *replName, const char *replGen, Object **obj)
- {
- char *fileName;
- PR_ASSERT(replName && replGen && obj);
- fileName = _cl5MakeFileName(replName, replGen);
- *obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
- if (*obj) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetDBFileByReplicaName - "
- "found DB object %p for database %s\n",
- *obj, fileName);
- slapi_ch_free_string(&fileName);
- return CL5_SUCCESS;
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetDBFileByReplicaName - "
- "no DB object found for database %s\n",
- fileName);
- slapi_ch_free_string(&fileName);
- return CL5_NOTFOUND;
- }
- }
- static void
- _cl5DBDeleteFile(Object *obj)
- {
- CL5DBFile *file;
- int rc = 0;
- PR_ASSERT(obj);
- file = (CL5DBFile *)object_get_data(obj);
- PR_ASSERT(file);
- file->flags |= DB_FILE_DELETED;
- rc = objset_remove_obj(s_cl5Desc.dbFiles, obj);
- if (rc != OBJSET_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBDeleteFile - "
- "could not find DB object %p\n",
- obj);
- } else {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBDeleteFile - "
- "removed DB object %p\n",
- obj);
- }
- object_release(obj);
- }
- static void
- _cl5DBFileInitialized(Object *obj)
- {
- CL5DBFile *file;
- PR_ASSERT(obj);
- file = (CL5DBFile *)object_get_data(obj);
- PR_ASSERT(file);
- file->flags |= DB_FILE_INIT;
- }
- static int
- _cl5CompareDBFile(Object *el1, const void *el2)
- {
- CL5DBFile *file;
- const char *name;
- PR_ASSERT(el1 && el2);
- file = (CL5DBFile *)object_get_data(el1);
- name = (const char *)el2;
- return ((file->flags & DB_FILE_INIT) ? strcmp(file->name, name) : 1);
- }
- /*
- * return 1: true (the "filename" ends with "ext")
- * return 0: false
- */
- static int
- _cl5FileEndsWith(const char *filename, const char *ext)
- {
- char *p = NULL;
- int flen = strlen(filename);
- int elen = strlen(ext);
- if (0 == flen || 0 == elen) {
- return 0;
- }
- p = PL_strrstr(filename, ext);
- if (NULL == p) {
- return 0;
- }
- if (p - filename + elen == flen) {
- return 1;
- }
- return 0;
- }
- static int
- _cl5ExportFile(PRFileDesc *prFile, Object *obj)
- {
- int rc;
- void *iterator = NULL;
- slapi_operation_parameters op = {0};
- char *buff;
- PRInt32 len, wlen;
- CL5Entry entry;
- CL5DBFile *file;
- PR_ASSERT(prFile && obj);
- file = (CL5DBFile *)object_get_data(obj);
- PR_ASSERT(file);
- if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
- ruv_dump(file->purgeRUV, "clpurgeruv", prFile);
- ruv_dump(file->maxRUV, "clmaxruv", prFile);
- }
- slapi_write_buffer(prFile, "\n", strlen("\n"));
- entry.op = &op;
- rc = _cl5GetFirstEntry(obj, &entry, &iterator, NULL);
- while (rc == CL5_SUCCESS) {
- rc = _cl5Operation2LDIF(&op, file->replGen, &buff, &len);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5ExportFile - Failed to convert operation to ldif\n");
- operation_parameters_done(&op);
- break;
- }
- wlen = slapi_write_buffer(prFile, buff, len);
- slapi_ch_free((void **)&buff);
- if (wlen < len) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5ExportFile - Failed to write to ldif file\n");
- rc = CL5_SYSTEM_ERROR;
- operation_parameters_done(&op);
- break;
- }
- cl5_operation_parameters_done(&op);
- rc = _cl5GetNextEntry(&entry, iterator);
- }
- cl5_operation_parameters_done(&op);
- if (iterator)
- cl5DestroyIterator(iterator);
- if (rc != CL5_NOTFOUND) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "_cl5ExportFile - Failed to retrieve changelog entry\n");
- } else {
- rc = CL5_SUCCESS;
- }
- return rc;
- }
- static PRBool
- _cl5ReplicaInList(Object *replica, Object **replicas)
- {
- int i;
- PR_ASSERT(replica && replicas);
- /* ONREPL I think it should be sufficient to just compare replica pointers */
- for (i = 0; replicas[i]; i++) {
- if (replica == replicas[i])
- return PR_TRUE;
- }
- return PR_FALSE;
- }
- static char *
- _cl5GetHelperEntryKey(int type, char *csnStr)
- {
- CSN *csn = csn_new();
- char *rt;
- csn_set_time(csn, (time_t)type);
- csn_set_replicaid(csn, 0);
- rt = csn_as_string(csn, PR_FALSE, csnStr);
- csn_free(&csn);
- return rt;
- }
- static Object *
- _cl5GetReplica(const slapi_operation_parameters *op, const char *replGen)
- {
- Slapi_DN *sdn;
- Object *replObj;
- Replica *replica;
- char *newGen;
- PR_ASSERT(op && replGen);
- sdn = op->target_address.sdn;
- replObj = replica_get_replica_from_dn(sdn);
- if (replObj) {
- /* check to see if replica generation has not change */
- replica = (Replica *)object_get_data(replObj);
- PR_ASSERT(replica);
- newGen = replica_get_generation(replica);
- PR_ASSERT(newGen);
- if (strcmp(replGen, newGen) != 0) {
- object_release(replObj);
- replObj = NULL;
- }
- slapi_ch_free((void **)&newGen);
- }
- return replObj;
- }
- int
- cl5_is_diskfull()
- {
- int rc;
- PR_Lock(cl5_diskfull_lock);
- rc = cl5_diskfull_flag;
- PR_Unlock(cl5_diskfull_lock);
- return rc;
- }
- static void
- cl5_set_diskfull(void)
- {
- PR_Lock(cl5_diskfull_lock);
- cl5_diskfull_flag = 1;
- PR_Unlock(cl5_diskfull_lock);
- }
- static void
- cl5_set_no_diskfull(void)
- {
- PR_Lock(cl5_diskfull_lock);
- cl5_diskfull_flag = 0;
- PR_Unlock(cl5_diskfull_lock);
- }
- int
- cl5_diskspace_is_available()
- {
- int rval = 1;
- #if defined(OS_solaris) || defined(hpux)
- struct statvfs fsbuf;
- if (statvfs(s_cl5Desc.dbDir, &fsbuf) < 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5_diskspace_is_available - Cannot get file system info\n");
- rval = 0;
- } else {
- unsigned long fsiz = fsbuf.f_bavail * fsbuf.f_frsize;
- if (fsiz < NO_DISK_SPACE) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5_diskspace_is_available - No enough diskspace for changelog: (%u bytes free)\n", fsiz);
- rval = 0;
- } else if (fsiz > MIN_DISK_SPACE) {
- /* assume recovered */
- cl5_set_no_diskfull();
- }
- }
- #endif
- #if defined(linux)
- struct statfs fsbuf;
- if (statfs(s_cl5Desc.dbDir, &fsbuf) < 0) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5_diskspace_is_available - Cannot get file system info\n");
- rval = 0;
- } else {
- unsigned long fsiz = fsbuf.f_bavail * fsbuf.f_bsize;
- if (fsiz < NO_DISK_SPACE) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5_diskspace_is_available - No enough diskspace for changelog: (%lu bytes free)\n", fsiz);
- rval = 0;
- } else if (fsiz > MIN_DISK_SPACE) {
- /* assume recovered */
- cl5_set_no_diskfull();
- }
- }
- #endif
- return rval;
- }
- int
- cl5DbDirIsEmpty(const char *dir)
- {
- PRDir *prDir;
- PRDirEntry *prDirEntry;
- int isempty = 1;
- if (!dir || !*dir) {
- return isempty;
- }
- /* assume failure means it does not exist - other failure
- cases will be handled by code which attempts to create the
- db in this directory */
- if (PR_Access(dir, PR_ACCESS_EXISTS)) {
- return isempty;
- }
- prDir = PR_OpenDir(dir);
- if (prDir == NULL) {
- return isempty; /* assume failure means does not exist */
- }
- while (NULL != (prDirEntry = PR_ReadDir(prDir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) {
- if (NULL == prDirEntry->name) { /* NSPR doesn't behave like the docs say it should */
- break;
- }
- isempty = 0; /* found at least one "real" file */
- break;
- }
- PR_CloseDir(prDir);
- return isempty;
- }
- /*
- * Write RUVs into the changelog;
- * implemented for backup to make sure the backed up changelog contains RUVs
- * Return values: 0 -- success
- * 1 -- failure
- */
- int
- cl5WriteRUV()
- {
- int rc = 0;
- Object *file_obj = NULL;
- CL5DBFile *dbfile = NULL;
- int closeit = 0;
- int slapd_pid = 0;
- changelog5Config config;
- /* read changelog configuration */
- changelog5_read_config(&config);
- if (config.dir == NULL) {
- /* Changelog is not configured; Replication is not enabled.
- * we don't have to update RUVs.
- * bail out - return success */
- goto bail;
- }
- slapd_pid = is_slapd_running();
- if (slapd_pid <= 0) {
- /* I'm not a server, rather a utility.
- * And the server is NOT running.
- * RUVs should be in the changelog.
- * we don't have to update RUVs.
- * bail out - return success */
- goto bail;
- }
- if (getpid() != slapd_pid) {
- /* I'm not a server, rather a utility.
- * And the server IS running.
- * RUVs are not in the changelog and no easy way to retrieve them.
- * bail out - return failure */
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5WriteRUV - server (pid %d) is already running; bail.\n",
- slapd_pid);
- rc = 1;
- goto bail;
- }
- /* file is stored in the changelog directory and is named
- * <replica name>.ldif */
- if (CL5_STATE_OPEN != s_cl5Desc.dbState) {
- rc = _cl5Open(config.dir, &config.dbconfig, CL5_OPEN_NORMAL);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5WriteRUV - Failed to open changelog\n");
- goto bail;
- }
- s_cl5Desc.dbState = CL5_STATE_OPEN; /* force to change the state */
- closeit = 1; /* It had not been opened; close it */
- }
- file_obj = objset_first_obj(s_cl5Desc.dbFiles);
- while (file_obj) {
- dbfile = (CL5DBFile *)object_get_data(file_obj);
- if (dbfile) {
- _cl5WriteEntryCount(dbfile);
- _cl5WriteRUV(dbfile, PR_TRUE);
- _cl5WriteRUV(dbfile, PR_FALSE);
- }
- file_obj = objset_next_obj(s_cl5Desc.dbFiles, file_obj);
- }
- bail:
- if (closeit && (CL5_STATE_OPEN == s_cl5Desc.dbState)) {
- _cl5Close();
- s_cl5Desc.dbState = CL5_STATE_CLOSED; /* force to change the state */
- }
- changelog5_config_done(&config);
- return rc;
- }
- /*
- * Delete RUVs from the changelog;
- * implemented for backup to clean up RUVs
- * Return values: 0 -- success
- * 1 -- failure
- */
- int
- cl5DeleteRUV()
- {
- int rc = 0;
- Object *file_obj = NULL;
- CL5DBFile *dbfile = NULL;
- int slapd_pid = 0;
- int closeit = 0;
- changelog5Config config;
- /* read changelog configuration */
- changelog5_read_config(&config);
- if (config.dir == NULL) {
- /* Changelog is not configured; Replication is not enabled.
- * we don't have to update RUVs.
- * bail out - return success */
- goto bail;
- }
- slapd_pid = is_slapd_running();
- if (slapd_pid <= 0) {
- /* I'm not a server, rather a utility.
- * And the server is NOT running.
- * RUVs should be in the changelog.
- * we don't have to update RUVs.
- * bail out - return success */
- goto bail;
- }
- if (getpid() != slapd_pid) {
- /* I'm not a server, rather a utility.
- * And the server IS running.
- * RUVs are not in the changelog.
- * bail out - return success */
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5DeleteRUV - server (pid %d) is already running; bail.\n",
- slapd_pid);
- goto bail;
- }
- /* file is stored in the changelog directory and is named
- * <replica name>.ldif */
- if (CL5_STATE_OPEN != s_cl5Desc.dbState) {
- rc = _cl5Open(config.dir, &config.dbconfig, CL5_OPEN_NORMAL);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5DeleteRUV - Failed to open changelog\n");
- goto bail;
- }
- s_cl5Desc.dbState = CL5_STATE_OPEN; /* force to change the state */
- closeit = 1; /* It had been opened; no need to close */
- }
- file_obj = objset_first_obj(s_cl5Desc.dbFiles);
- while (file_obj) {
- dbfile = (CL5DBFile *)object_get_data(file_obj);
- /* _cl5GetEntryCount deletes entry count after reading it */
- rc = _cl5GetEntryCount(dbfile);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "cl5DeleteRUV - Failed to get/delete entry count\n");
- goto bail;
- }
- /* _cl5ReadRUV deletes RUV after reading it */
- rc = _cl5ReadRUV(dbfile->replGen, file_obj, PR_TRUE);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5DeleteRUV - Failed to read/delete purge RUV\n");
- goto bail;
- }
- rc = _cl5ReadRUV(dbfile->replGen, file_obj, PR_FALSE);
- if (rc != CL5_SUCCESS) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "cl5DeleteRUV - Failed to read/delete upper bound RUV\n");
- goto bail;
- }
- file_obj = objset_next_obj(s_cl5Desc.dbFiles, file_obj);
- }
- bail:
- if (file_obj) {
- object_release(file_obj);
- }
- if (closeit && (CL5_STATE_OPEN == s_cl5Desc.dbState)) {
- _cl5Close();
- s_cl5Desc.dbState = CL5_STATE_CLOSED; /* force to change the state */
- }
- changelog5_config_done(&config);
- return rc;
- }
- /*
- * Clean the in memory RUV, at shutdown we will write the update to the db
- */
- void
- cl5CleanRUV(ReplicaId rid)
- {
- CL5DBFile *file;
- Object *obj = NULL;
- slapi_rwlock_wrlock(s_cl5Desc.stLock);
- obj = objset_first_obj(s_cl5Desc.dbFiles);
- while (obj) {
- file = (CL5DBFile *)object_get_data(obj);
- ruv_delete_replica(file->purgeRUV, rid);
- ruv_delete_replica(file->maxRUV, rid);
- obj = objset_next_obj(s_cl5Desc.dbFiles, obj);
- }
- slapi_rwlock_unlock(s_cl5Desc.stLock);
- }
- static void
- free_purge_data(cleanruv_purge_data *purge_data)
- {
- slapi_ch_free_string(&purge_data->replGen);
- slapi_ch_free((void **)&purge_data);
- }
- /*
- * Create a thread to purge a changelog of cleaned RIDs
- */
- void
- trigger_cl_purging(cleanruv_purge_data *purge_data)
- {
- PRThread *trim_tid = NULL;
- trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void *)trigger_cl_purging_thread,
- (void *)purge_data, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
- PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
- if (NULL == trim_tid) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "trigger_cl_purging - Failed to create cl purging "
- "thread; NSPR error - %d\n",
- PR_GetError());
- free_purge_data(purge_data);
- } else {
- /* need a little time for the thread to get started */
- DS_Sleep(PR_SecondsToInterval(1));
- }
- }
- /*
- * Purge a changelog of entries that originated from a particular replica(rid)
- */
- void
- trigger_cl_purging_thread(void *arg)
- {
- cleanruv_purge_data *purge_data = (cleanruv_purge_data *)arg;
- /* Make sure we have a change log, and we aren't closing it */
- if (s_cl5Desc.dbState == CL5_STATE_CLOSED ||
- s_cl5Desc.dbState == CL5_STATE_CLOSING) {
- goto free_and_return;
- }
- /* Bump the changelog thread count */
- if (CL5_SUCCESS != _cl5AddThread()) {
- slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name_cl,
- "trigger_cl_purging_thread - Abort, failed to increment thread count "
- "NSPR error - %d\n",
- PR_GetError());
- goto free_and_return;
- }
- /* Purge the changelog */
- _cl5DoPurging(purge_data);
- _cl5RemoveThread();
- slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl,
- "trigger_cl_purging_thread - purged changelog for (%s) rid (%d)\n",
- slapi_sdn_get_dn(purge_data->suffix_sdn), purge_data->cleaned_rid);
- free_and_return:
- free_purge_data(purge_data);
- }
|