Bond.cpp 72 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992
  1. /*
  2. * Copyright (c)2013-2020 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2025-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include "Bond.hpp"
  14. #include "../osdep/OSUtils.hpp"
  15. #include "Switch.hpp"
  16. #include <cmath>
  17. namespace ZeroTier {
  18. Bond::Bond(const RuntimeEnvironment* renv, int policy, const SharedPtr<Peer>& peer)
  19. : RR(renv)
  20. , _peer(peer)
  21. , _qosCutoffCount(0)
  22. , _ackCutoffCount(0)
  23. , _lastAckRateCheck(0)
  24. , _lastQoSRateCheck(0)
  25. , _lastQualityEstimation(0)
  26. , _lastCheckUserPreferences(0)
  27. , _lastBackgroundTaskCheck(0)
  28. , _lastBondStatusLog(0)
  29. , _lastPathNegotiationReceived(0)
  30. , _lastPathNegotiationCheck(0)
  31. , _lastSentPathNegotiationRequest(0)
  32. , _lastFlowStatReset(0)
  33. , _lastFlowExpirationCheck(0)
  34. , _lastFlowRebalance(0)
  35. , _lastFrame(0)
  36. , _lastActiveBackupPathChange(0)
  37. {
  38. setReasonableDefaults(policy, SharedPtr<Bond>(), false);
  39. _policyAlias = BondController::getPolicyStrByCode(policy);
  40. }
  41. Bond::Bond(const RuntimeEnvironment* renv, std::string& basePolicy, std::string& policyAlias, const SharedPtr<Peer>& peer) : RR(renv), _policyAlias(policyAlias), _peer(peer)
  42. {
  43. setReasonableDefaults(BondController::getPolicyCodeByStr(basePolicy), SharedPtr<Bond>(), false);
  44. }
  45. Bond::Bond(const RuntimeEnvironment* renv, SharedPtr<Bond> originalBond, const SharedPtr<Peer>& peer)
  46. : RR(renv)
  47. , _peer(peer)
  48. , _lastAckRateCheck(0)
  49. , _lastQoSRateCheck(0)
  50. , _lastQualityEstimation(0)
  51. , _lastCheckUserPreferences(0)
  52. , _lastBackgroundTaskCheck(0)
  53. , _lastBondStatusLog(0)
  54. , _lastPathNegotiationReceived(0)
  55. , _lastPathNegotiationCheck(0)
  56. , _lastFlowStatReset(0)
  57. , _lastFlowExpirationCheck(0)
  58. , _lastFlowRebalance(0)
  59. , _lastFrame(0)
  60. {
  61. setReasonableDefaults(originalBond->_bondingPolicy, originalBond, true);
  62. }
  63. void Bond::nominatePath(const SharedPtr<Path>& path, int64_t now)
  64. {
  65. char traceMsg[256];
  66. char pathStr[128];
  67. path->address().toString(pathStr);
  68. Mutex::Lock _l(_paths_m);
  69. if (! RR->bc->linkAllowed(_policyAlias, getLink(path))) {
  70. return;
  71. }
  72. bool alreadyPresent = false;
  73. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  74. if (path.ptr() == _paths[i].ptr()) {
  75. // Previously encountered path, not notifying bond
  76. alreadyPresent = true;
  77. break;
  78. }
  79. }
  80. if (! alreadyPresent) {
  81. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  82. if (! _paths[i]) {
  83. _paths[i] = path;
  84. sprintf(
  85. traceMsg,
  86. "%s (bond) Nominating link %s/%s to peer %llx. It has now entered its trial period",
  87. OSUtils::humanReadableTimestamp().c_str(),
  88. getLink(path)->ifname().c_str(),
  89. pathStr,
  90. (unsigned long long)(_peer->_id.address().toInt()));
  91. RR->t->bondStateMessage(NULL, traceMsg);
  92. _paths[i]->startTrial(now);
  93. break;
  94. }
  95. }
  96. }
  97. curateBond(now, true);
  98. estimatePathQuality(now);
  99. }
  100. SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
  101. {
  102. Mutex::Lock _l(_paths_m);
  103. /**
  104. * active-backup
  105. */
  106. if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
  107. if (_abPath) {
  108. return _abPath;
  109. }
  110. }
  111. /**
  112. * broadcast
  113. */
  114. if (_bondingPolicy == ZT_BONDING_POLICY_BROADCAST) {
  115. return SharedPtr<Path>(); // Handled in Switch::_trySend()
  116. }
  117. if (! _numBondedPaths) {
  118. return SharedPtr<Path>(); // No paths assigned to bond yet, cannot balance traffic
  119. }
  120. /**
  121. * balance-rr
  122. */
  123. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
  124. if (! _allowFlowHashing) {
  125. if (_packetsPerLink == 0) {
  126. // Randomly select a path
  127. return _paths[_bondedIdx[_freeRandomByte % _numBondedPaths]]; // TODO: Optimize
  128. }
  129. if (_rrPacketsSentOnCurrLink < _packetsPerLink) {
  130. // Continue to use this link
  131. ++_rrPacketsSentOnCurrLink;
  132. return _paths[_bondedIdx[_rrIdx]];
  133. }
  134. // Reset striping counter
  135. _rrPacketsSentOnCurrLink = 0;
  136. if (_numBondedPaths == 1) {
  137. _rrIdx = 0;
  138. }
  139. else {
  140. int _tempIdx = _rrIdx;
  141. for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) {
  142. _tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1;
  143. if (_bondedIdx[_tempIdx] != ZT_MAX_PEER_NETWORK_PATHS) {
  144. if (_paths[_bondedIdx[_tempIdx]] && _paths[_bondedIdx[_tempIdx]]->eligible(now, _ackSendInterval)) {
  145. _rrIdx = _tempIdx;
  146. break;
  147. }
  148. }
  149. }
  150. }
  151. if (_paths[_bondedIdx[_rrIdx]]) {
  152. return _paths[_bondedIdx[_rrIdx]];
  153. }
  154. }
  155. }
  156. /**
  157. * balance-xor
  158. */
  159. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  160. if (! _allowFlowHashing || flowId == -1) {
  161. // No specific path required for unclassified traffic, send on anything
  162. return _paths[_bondedIdx[_freeRandomByte % _numBondedPaths]]; // TODO: Optimize
  163. }
  164. else if (_allowFlowHashing) {
  165. // TODO: Optimize
  166. Mutex::Lock _l(_flows_m);
  167. SharedPtr<Flow> flow;
  168. if (_flows.count(flowId)) {
  169. flow = _flows[flowId];
  170. flow->updateActivity(now);
  171. }
  172. else {
  173. unsigned char entropy;
  174. Utils::getSecureRandom(&entropy, 1);
  175. flow = createFlow(SharedPtr<Path>(), flowId, entropy, now);
  176. }
  177. if (flow) {
  178. return flow->assignedPath();
  179. }
  180. }
  181. }
  182. return SharedPtr<Path>();
  183. }
  184. void Bond::recordIncomingInvalidPacket(const SharedPtr<Path>& path)
  185. {
  186. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  187. // sprintf(traceMsg, "%s (qos) Invalid packet on link %s/%s from peer %llx",
  188. // OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, (unsigned long long)(_peer->_id.address().toInt()));
  189. // RR->t->bondStateMessage(NULL, traceMsg);
  190. Mutex::Lock _l(_paths_m);
  191. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  192. if (_paths[i] == path) {
  193. _paths[i]->packetValiditySamples.push(false);
  194. }
  195. }
  196. }
  197. void Bond::recordOutgoingPacket(const SharedPtr<Path>& path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now)
  198. {
  199. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  200. // sprintf(traceMsg, "%s (bond) Outgoing packet on link %s/%s to peer %llx",
  201. // OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, (unsigned long long)(_peer->_id.address().toInt()));
  202. // RR->t->bondStateMessage(NULL, traceMsg);
  203. _freeRandomByte += (unsigned char)(packetId >> 8); // Grab entropy to use in path selection logic
  204. if (! _shouldCollectPathStatistics) {
  205. return;
  206. }
  207. bool isFrame = (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME);
  208. bool shouldRecord = (packetId & (ZT_QOS_ACK_DIVISOR - 1) && (verb != Packet::VERB_ACK) && (verb != Packet::VERB_QOS_MEASUREMENT));
  209. if (isFrame || shouldRecord) {
  210. Mutex::Lock _l(_paths_m);
  211. if (isFrame) {
  212. ++(path->_packetsOut);
  213. _lastFrame = now;
  214. }
  215. if (shouldRecord) {
  216. path->_unackedBytes += payloadLength;
  217. // Take note that we're expecting a VERB_ACK on this path as of a specific time
  218. if (path->qosStatsOut.size() < ZT_QOS_MAX_OUTSTANDING_RECORDS) {
  219. path->qosStatsOut[packetId] = now;
  220. }
  221. }
  222. }
  223. if (_allowFlowHashing && (flowId != ZT_QOS_NO_FLOW)) {
  224. Mutex::Lock _l(_flows_m);
  225. if (_flows.count(flowId)) {
  226. _flows[flowId]->recordOutgoingBytes(payloadLength);
  227. }
  228. }
  229. }
  230. void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId, uint16_t payloadLength, Packet::Verb verb, int32_t flowId, int64_t now)
  231. {
  232. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  233. // sprintf(traceMsg, "%s (bond) Incoming packet on link %s/%s from peer %llx [id=%llx, len=%d, verb=%d, flowId=%x]",
  234. // OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, (unsigned long long)(_peer->_id.address().toInt()), packetId, payloadLength, verb, flowId);
  235. // RR->t->bondStateMessage(NULL, traceMsg);
  236. bool isFrame = (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME);
  237. bool shouldRecord = (packetId & (ZT_QOS_ACK_DIVISOR - 1) && (verb != Packet::VERB_ACK) && (verb != Packet::VERB_QOS_MEASUREMENT));
  238. if (isFrame || shouldRecord) {
  239. Mutex::Lock _l(_paths_m);
  240. if (isFrame) {
  241. ++(path->_packetsIn);
  242. _lastFrame = now;
  243. }
  244. if (shouldRecord) {
  245. path->ackStatsIn[packetId] = payloadLength;
  246. ++(path->_packetsReceivedSinceLastAck);
  247. path->qosStatsIn[packetId] = now;
  248. ++(path->_packetsReceivedSinceLastQoS);
  249. path->packetValiditySamples.push(true);
  250. }
  251. }
  252. /**
  253. * Learn new flows and pro-actively create entries for them in the bond so
  254. * that the next time we send a packet out that is part of a flow we know
  255. * which path to use.
  256. */
  257. if ((flowId != ZT_QOS_NO_FLOW) && (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE)) {
  258. Mutex::Lock _l(_flows_m);
  259. SharedPtr<Flow> flow;
  260. if (! _flows.count(flowId)) {
  261. flow = createFlow(path, flowId, 0, now);
  262. }
  263. else {
  264. flow = _flows[flowId];
  265. }
  266. if (flow) {
  267. flow->recordIncomingBytes(payloadLength);
  268. }
  269. }
  270. }
  271. void Bond::receivedQoS(const SharedPtr<Path>& path, int64_t now, int count, uint64_t* rx_id, uint16_t* rx_ts)
  272. {
  273. Mutex::Lock _l(_paths_m);
  274. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  275. // sprintf(traceMsg, "%s (qos) Received QoS packet sampling %d frames from peer %llx via %s/%s",
  276. // OSUtils::humanReadableTimestamp().c_str(), count, (unsigned long long)(_peer->_id.address().toInt()), getLink(path)->ifname().c_str(), pathStr);
  277. // RR->t->bondStateMessage(NULL, traceMsg);
  278. // Look up egress times and compute latency values for each record
  279. std::map<uint64_t, uint64_t>::iterator it;
  280. for (int j = 0; j < count; j++) {
  281. it = path->qosStatsOut.find(rx_id[j]);
  282. if (it != path->qosStatsOut.end()) {
  283. path->latencySamples.push(((uint16_t)(now - it->second) - rx_ts[j]) / 2);
  284. path->qosStatsOut.erase(it);
  285. }
  286. }
  287. path->qosRecordSize.push(count);
  288. }
  289. void Bond::receivedAck(const SharedPtr<Path>& path, int64_t now, int32_t ackedBytes)
  290. {
  291. Mutex::Lock _l(_paths_m);
  292. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  293. // sprintf(traceMsg, "%s (qos) Received ACK packet for %d bytes from peer %llx via %s/%s",
  294. // OSUtils::humanReadableTimestamp().c_str(), ackedBytes, (unsigned long long)(_peer->_id.address().toInt()), getLink(path)->ifname().c_str(), pathStr);
  295. // RR->t->bondStateMessage(NULL, traceMsg);
  296. path->_lastAckReceived = now;
  297. path->_unackedBytes = (ackedBytes > path->_unackedBytes) ? 0 : path->_unackedBytes - ackedBytes;
  298. int64_t timeSinceThroughputEstimate = (now - path->_lastThroughputEstimation);
  299. if (timeSinceThroughputEstimate >= throughputMeasurementInterval) {
  300. // TODO: See if this floating point math can be reduced
  301. uint64_t throughput = (uint64_t)((float)(path->_bytesAckedSinceLastThroughputEstimation) / ((float)timeSinceThroughputEstimate / (float)1000));
  302. throughput /= 1000;
  303. if (throughput > 0.0) {
  304. path->throughputSamples.push(throughput);
  305. path->_throughputMax = throughput > path->_throughputMax ? throughput : path->_throughputMax;
  306. }
  307. path->_lastThroughputEstimation = now;
  308. path->_bytesAckedSinceLastThroughputEstimation = 0;
  309. }
  310. else {
  311. path->_bytesAckedSinceLastThroughputEstimation += ackedBytes;
  312. }
  313. }
  314. int32_t Bond::generateQoSPacket(const SharedPtr<Path>& path, int64_t now, char* qosBuffer)
  315. {
  316. int32_t len = 0;
  317. std::map<uint64_t, uint64_t>::iterator it = path->qosStatsIn.begin();
  318. int i = 0;
  319. int numRecords = std::min(path->_packetsReceivedSinceLastQoS, ZT_QOS_TABLE_SIZE);
  320. while (i < numRecords && it != path->qosStatsIn.end()) {
  321. uint64_t id = it->first;
  322. memcpy(qosBuffer, &id, sizeof(uint64_t));
  323. qosBuffer += sizeof(uint64_t);
  324. uint16_t holdingTime = (uint16_t)(now - it->second);
  325. memcpy(qosBuffer, &holdingTime, sizeof(uint16_t));
  326. qosBuffer += sizeof(uint16_t);
  327. len += sizeof(uint64_t) + sizeof(uint16_t);
  328. path->qosStatsIn.erase(it++);
  329. ++i;
  330. }
  331. return len;
  332. }
  333. bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now)
  334. {
  335. char traceMsg[256];
  336. char curPathStr[128];
  337. unsigned int idx = ZT_MAX_PEER_NETWORK_PATHS;
  338. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) {
  339. idx = abs((int)(flow->id() % (_numBondedPaths)));
  340. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[_bondedIdx[idx]]->localSocket());
  341. _paths[_bondedIdx[idx]]->address().toString(curPathStr);
  342. sprintf(
  343. traceMsg,
  344. "%s (balance-xor) Assigned outgoing flow %x to peer %llx to link %s/%s, %lu active flow(s)",
  345. OSUtils::humanReadableTimestamp().c_str(),
  346. flow->id(),
  347. (unsigned long long)(_peer->_id.address().toInt()),
  348. link->ifname().c_str(),
  349. curPathStr,
  350. (unsigned long)_flows.size());
  351. RR->t->bondStateMessage(NULL, traceMsg);
  352. flow->assignPath(_paths[_bondedIdx[idx]], now);
  353. ++(_paths[_bondedIdx[idx]]->_assignedFlowCount);
  354. }
  355. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  356. unsigned char entropy;
  357. Utils::getSecureRandom(&entropy, 1);
  358. if (_totalBondUnderload) {
  359. entropy %= _totalBondUnderload;
  360. }
  361. if (! _numBondedPaths) {
  362. sprintf(traceMsg, "%s (balance-aware) There are no bonded paths, cannot assign flow %x\n", OSUtils::humanReadableTimestamp().c_str(), flow->id());
  363. RR->t->bondStateMessage(NULL, traceMsg);
  364. return false;
  365. }
  366. /* Since there may be scenarios where a path is removed before we can re-estimate
  367. relative qualities (and thus allocations) we need to down-modulate the entropy
  368. value that we use to randomly assign among the surviving paths, otherwise we risk
  369. not being able to find a path to assign this flow to. */
  370. int totalIncompleteAllocation = 0;
  371. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  372. if (_paths[i] && _paths[i]->bonded()) {
  373. totalIncompleteAllocation += _paths[i]->_allocation;
  374. }
  375. }
  376. entropy %= totalIncompleteAllocation;
  377. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  378. if (_paths[i] && _paths[i]->bonded()) {
  379. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  380. _paths[i]->address().toString(curPathStr);
  381. uint8_t probabilitySegment = (_totalBondUnderload > 0) ? _paths[i]->_affinity : _paths[i]->_allocation;
  382. if (entropy <= probabilitySegment) {
  383. idx = i;
  384. break;
  385. }
  386. entropy -= probabilitySegment;
  387. }
  388. }
  389. if (idx < ZT_MAX_PEER_NETWORK_PATHS) {
  390. if (flow->_assignedPath) {
  391. flow->_previouslyAssignedPath = flow->_assignedPath;
  392. }
  393. flow->assignPath(_paths[idx], now);
  394. ++(_paths[idx]->_assignedFlowCount);
  395. }
  396. else {
  397. fprintf(stderr, "could not assign flow?\n");
  398. exit(0); // TODO: Remove for production
  399. return false;
  400. }
  401. }
  402. if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
  403. if (_abOverflowEnabled) {
  404. flow->assignPath(_abPath, now);
  405. }
  406. else {
  407. sprintf(traceMsg, "%s (bond) Unable to assign outgoing flow %x to peer %llx, no active overflow link", OSUtils::humanReadableTimestamp().c_str(), flow->id(), (unsigned long long)(_peer->_id.address().toInt()));
  408. RR->t->bondStateMessage(NULL, traceMsg);
  409. return false;
  410. }
  411. }
  412. flow->assignedPath()->address().toString(curPathStr);
  413. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, flow->assignedPath()->localSocket());
  414. sprintf(
  415. traceMsg,
  416. "%s (bond) Assigned outgoing flow %x to peer %llx to link %s/%s, %lu active flow(s)",
  417. OSUtils::humanReadableTimestamp().c_str(),
  418. flow->id(),
  419. (unsigned long long)(_peer->_id.address().toInt()),
  420. link->ifname().c_str(),
  421. curPathStr,
  422. (unsigned long)_flows.size());
  423. RR->t->bondStateMessage(NULL, traceMsg);
  424. return true;
  425. }
  426. SharedPtr<Flow> Bond::createFlow(const SharedPtr<Path>& path, int32_t flowId, unsigned char entropy, int64_t now)
  427. {
  428. char traceMsg[256];
  429. char curPathStr[128];
  430. // ---
  431. if (! _numBondedPaths) {
  432. sprintf(traceMsg, "%s (bond) There are no bonded paths to peer %llx, cannot assign flow %x\n", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), flowId);
  433. RR->t->bondStateMessage(NULL, traceMsg);
  434. return SharedPtr<Flow>();
  435. }
  436. if (_flows.size() >= ZT_FLOW_MAX_COUNT) {
  437. sprintf(
  438. traceMsg,
  439. "%s (bond) Maximum number of flows on bond to peer %llx reached (%d), forcibly forgetting oldest flow\n",
  440. OSUtils::humanReadableTimestamp().c_str(),
  441. (unsigned long long)(_peer->_id.address().toInt()),
  442. ZT_FLOW_MAX_COUNT);
  443. RR->t->bondStateMessage(NULL, traceMsg);
  444. forgetFlowsWhenNecessary(0, true, now);
  445. }
  446. SharedPtr<Flow> flow = new Flow(flowId, now);
  447. _flows[flowId] = flow;
  448. /**
  449. * Add a flow with a given Path already provided. This is the case when a packet
  450. * is received on a path but no flow exists, in this case we simply assign the path
  451. * that the remote peer chose for us.
  452. */
  453. if (path) {
  454. flow->assignPath(path, now);
  455. path->address().toString(curPathStr);
  456. path->_assignedFlowCount++;
  457. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, flow->assignedPath()->localSocket());
  458. sprintf(
  459. traceMsg,
  460. "%s (bond) Assigned incoming flow %x from peer %llx to link %s/%s, %lu active flow(s)",
  461. OSUtils::humanReadableTimestamp().c_str(),
  462. flow->id(),
  463. (unsigned long long)(_peer->_id.address().toInt()),
  464. link->ifname().c_str(),
  465. curPathStr,
  466. (unsigned long)_flows.size());
  467. RR->t->bondStateMessage(NULL, traceMsg);
  468. }
  469. /**
  470. * Add a flow when no path was provided. This means that it is an outgoing packet
  471. * and that it is up to the local peer to decide how to load-balance its transmission.
  472. */
  473. else if (! path) {
  474. assignFlowToBondedPath(flow, now);
  475. }
  476. return flow;
  477. }
  478. void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now)
  479. {
  480. char traceMsg[256];
  481. std::map<int32_t, SharedPtr<Flow> >::iterator it = _flows.begin();
  482. std::map<int32_t, SharedPtr<Flow> >::iterator oldestFlow = _flows.end();
  483. SharedPtr<Flow> expiredFlow;
  484. if (age) { // Remove by specific age
  485. while (it != _flows.end()) {
  486. if (it->second->age(now) > age) {
  487. sprintf(
  488. traceMsg,
  489. "%s (bond) Forgetting flow %x between this node and peer %llx, %lu active flow(s)",
  490. OSUtils::humanReadableTimestamp().c_str(),
  491. it->first,
  492. (unsigned long long)(_peer->_id.address().toInt()),
  493. (unsigned long)(_flows.size() - 1));
  494. RR->t->bondStateMessage(NULL, traceMsg);
  495. it->second->assignedPath()->_assignedFlowCount--;
  496. it = _flows.erase(it);
  497. }
  498. else {
  499. ++it;
  500. }
  501. }
  502. }
  503. else if (oldest) { // Remove single oldest by natural expiration
  504. uint64_t maxAge = 0;
  505. while (it != _flows.end()) {
  506. if (it->second->age(now) > maxAge) {
  507. maxAge = (now - it->second->age(now));
  508. oldestFlow = it;
  509. }
  510. ++it;
  511. }
  512. if (oldestFlow != _flows.end()) {
  513. sprintf(
  514. traceMsg,
  515. "%s (bond) Forgetting oldest flow %x (of age %llu) between this node and peer %llx, %lu active flow(s)",
  516. OSUtils::humanReadableTimestamp().c_str(),
  517. oldestFlow->first,
  518. (unsigned long long)oldestFlow->second->age(now),
  519. (unsigned long long)(_peer->_id.address().toInt()),
  520. (unsigned long)(_flows.size() - 1));
  521. RR->t->bondStateMessage(NULL, traceMsg);
  522. oldestFlow->second->assignedPath()->_assignedFlowCount--;
  523. _flows.erase(oldestFlow);
  524. }
  525. }
  526. }
  527. void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>& path, int16_t remoteUtility)
  528. {
  529. char traceMsg[256];
  530. if (_abLinkSelectMethod != ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) {
  531. return;
  532. }
  533. Mutex::Lock _l(_paths_m);
  534. char pathStr[128];
  535. path->address().toString(pathStr);
  536. if (! _lastPathNegotiationCheck) {
  537. return;
  538. }
  539. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, path->localSocket());
  540. if (remoteUtility > _localUtility) {
  541. char pathStr[128];
  542. path->address().toString(pathStr);
  543. sprintf(
  544. traceMsg,
  545. "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is GREATER than local utility (%d), switching to said link\n",
  546. OSUtils::humanReadableTimestamp().c_str(),
  547. (unsigned long long)(_peer->_id.address().toInt()),
  548. link->ifname().c_str(),
  549. pathStr,
  550. remoteUtility,
  551. _localUtility);
  552. RR->t->bondStateMessage(NULL, traceMsg);
  553. negotiatedPath = path;
  554. }
  555. if (remoteUtility < _localUtility) {
  556. sprintf(
  557. traceMsg,
  558. "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is LESS than local utility (%d), not switching\n",
  559. OSUtils::humanReadableTimestamp().c_str(),
  560. (unsigned long long)(_peer->_id.address().toInt()),
  561. link->ifname().c_str(),
  562. pathStr,
  563. remoteUtility,
  564. _localUtility);
  565. RR->t->bondStateMessage(NULL, traceMsg);
  566. }
  567. if (remoteUtility == _localUtility) {
  568. sprintf(
  569. traceMsg,
  570. "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is equal to local utility (%d)\n",
  571. OSUtils::humanReadableTimestamp().c_str(),
  572. (unsigned long long)(_peer->_id.address().toInt()),
  573. link->ifname().c_str(),
  574. pathStr,
  575. remoteUtility,
  576. _localUtility);
  577. RR->t->bondStateMessage(NULL, traceMsg);
  578. if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) {
  579. sprintf(traceMsg, "%s (bond) Agreeing with peer %llx to use alternate link %s/%s\n", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), link->ifname().c_str(), pathStr);
  580. RR->t->bondStateMessage(NULL, traceMsg);
  581. negotiatedPath = path;
  582. }
  583. else {
  584. sprintf(traceMsg, "%s (bond) Ignoring petition from peer %llx to use alternate link %s/%s\n", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), link->ifname().c_str(), pathStr);
  585. RR->t->bondStateMessage(NULL, traceMsg);
  586. }
  587. }
  588. }
  589. void Bond::pathNegotiationCheck(void* tPtr, const int64_t now)
  590. {
  591. char pathStr[128];
  592. int maxInPathIdx = ZT_MAX_PEER_NETWORK_PATHS;
  593. int maxOutPathIdx = ZT_MAX_PEER_NETWORK_PATHS;
  594. uint64_t maxInCount = 0;
  595. uint64_t maxOutCount = 0;
  596. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  597. if (! _paths[i]) {
  598. continue;
  599. }
  600. if (_paths[i]->_packetsIn > maxInCount) {
  601. maxInCount = _paths[i]->_packetsIn;
  602. maxInPathIdx = i;
  603. }
  604. if (_paths[i]->_packetsOut > maxOutCount) {
  605. maxOutCount = _paths[i]->_packetsOut;
  606. maxOutPathIdx = i;
  607. }
  608. _paths[i]->resetPacketCounts();
  609. }
  610. bool _peerLinksSynchronized = ((maxInPathIdx != ZT_MAX_PEER_NETWORK_PATHS) && (maxOutPathIdx != ZT_MAX_PEER_NETWORK_PATHS) && (maxInPathIdx != maxOutPathIdx)) ? false : true;
  611. /**
  612. * Determine utility and attempt to petition remote peer to switch to our chosen path
  613. */
  614. if (! _peerLinksSynchronized) {
  615. _localUtility = _paths[maxOutPathIdx]->_failoverScore - _paths[maxInPathIdx]->_failoverScore;
  616. if (_paths[maxOutPathIdx]->_negotiated) {
  617. _localUtility -= ZT_MULTIPATH_FAILOVER_HANDICAP_NEGOTIATED;
  618. }
  619. if ((now - _lastSentPathNegotiationRequest) > ZT_PATH_NEGOTIATION_CUTOFF_TIME) {
  620. // fprintf(stderr, "BT: (sync) it's been long enough, sending more requests.\n");
  621. _numSentPathNegotiationRequests = 0;
  622. }
  623. if (_numSentPathNegotiationRequests < ZT_PATH_NEGOTIATION_TRY_COUNT) {
  624. if (_localUtility >= 0) {
  625. // fprintf(stderr, "BT: (sync) paths appear to be out of sync (utility=%d)\n", _localUtility);
  626. sendPATH_NEGOTIATION_REQUEST(tPtr, _paths[maxOutPathIdx]);
  627. ++_numSentPathNegotiationRequests;
  628. _lastSentPathNegotiationRequest = now;
  629. _paths[maxOutPathIdx]->address().toString(pathStr);
  630. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[maxOutPathIdx]->localSocket());
  631. // fprintf(stderr, "sending request to use %s on %s, ls=%llx, utility=%d\n", pathStr, link->ifname().c_str(), _paths[maxOutPathIdx]->localSocket(), _localUtility);
  632. }
  633. }
  634. /**
  635. * Give up negotiating and consider switching
  636. */
  637. else if ((now - _lastSentPathNegotiationRequest) > (2 * ZT_PATH_NEGOTIATION_CHECK_INTERVAL)) {
  638. if (_localUtility == 0) {
  639. // There's no loss to us, just switch without sending a another request
  640. // fprintf(stderr, "BT: (sync) giving up, switching to remote peer's path.\n");
  641. negotiatedPath = _paths[maxInPathIdx];
  642. }
  643. }
  644. }
  645. }
  646. void Bond::sendPATH_NEGOTIATION_REQUEST(void* tPtr, const SharedPtr<Path>& path)
  647. {
  648. char traceMsg[256];
  649. char pathStr[128];
  650. path->address().toString(pathStr);
  651. sprintf(
  652. traceMsg,
  653. "%s (bond) Sending link negotiation request to peer %llx via link %s/%s, local utility is %d",
  654. OSUtils::humanReadableTimestamp().c_str(),
  655. (unsigned long long)(_peer->_id.address().toInt()),
  656. getLink(path)->ifname().c_str(),
  657. pathStr,
  658. _localUtility);
  659. RR->t->bondStateMessage(NULL, traceMsg);
  660. if (_abLinkSelectMethod != ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) {
  661. return;
  662. }
  663. Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_PATH_NEGOTIATION_REQUEST);
  664. outp.append<int16_t>(_localUtility);
  665. if (path->address()) {
  666. outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
  667. RR->node->putPacket(tPtr, path->localSocket(), path->address(), outp.data(), outp.size());
  668. }
  669. }
  670. void Bond::sendACK(void* tPtr, const SharedPtr<Path>& path, const int64_t localSocket, const InetAddress& atAddress, int64_t now)
  671. {
  672. Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_ACK);
  673. int32_t bytesToAck = 0;
  674. std::map<uint64_t, uint16_t>::iterator it = path->ackStatsIn.begin();
  675. while (it != path->ackStatsIn.end()) {
  676. bytesToAck += it->second;
  677. ++it;
  678. }
  679. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  680. // sprintf(traceMsg, "%s (qos) Sending ACK packet for %d bytes to peer %llx via link %s/%s",
  681. // OSUtils::humanReadableTimestamp().c_str(), bytesToAck, (unsigned long long)(_peer->_id.address().toInt()), getLink(path)->ifname().c_str(), pathStr);
  682. // RR->t->bondStateMessage(NULL, traceMsg);
  683. outp.append<uint32_t>(bytesToAck);
  684. if (atAddress) {
  685. outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
  686. RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size());
  687. }
  688. else {
  689. RR->sw->send(tPtr, outp, false);
  690. }
  691. path->ackStatsIn.clear();
  692. path->_packetsReceivedSinceLastAck = 0;
  693. path->_lastAckSent = now;
  694. }
  695. void Bond::sendQOS_MEASUREMENT(void* tPtr, const SharedPtr<Path>& path, const int64_t localSocket, const InetAddress& atAddress, int64_t now)
  696. {
  697. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  698. // sprintf(traceMsg, "%s (qos) Sending QoS packet to peer %llx via link %s/%s",
  699. // OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), getLink(path)->ifname().c_str(), pathStr);
  700. // RR->t->bondStateMessage(NULL, traceMsg);
  701. const int64_t _now = RR->node->now();
  702. Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_QOS_MEASUREMENT);
  703. char qosData[ZT_QOS_MAX_PACKET_SIZE];
  704. int16_t len = generateQoSPacket(path, _now, qosData);
  705. outp.append(qosData, len);
  706. if (atAddress) {
  707. outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
  708. RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size());
  709. }
  710. else {
  711. RR->sw->send(tPtr, outp, false);
  712. }
  713. // Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers.
  714. path->_packetsReceivedSinceLastQoS = 0;
  715. path->_lastQoSMeasurement = now;
  716. }
  717. void Bond::processBackgroundTasks(void* tPtr, const int64_t now)
  718. {
  719. Mutex::Lock _l(_paths_m);
  720. if (! _peer->_canUseMultipath || (now - _lastBackgroundTaskCheck) < ZT_BOND_BACKGROUND_TASK_MIN_INTERVAL) {
  721. return;
  722. }
  723. _lastBackgroundTaskCheck = now;
  724. // Compute dynamic path monitor timer interval
  725. if (_linkMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC) {
  726. int suggestedMonitorInterval = (now - _lastFrame) / 100;
  727. _dynamicPathMonitorInterval = std::min(ZT_PATH_HEARTBEAT_PERIOD, ((suggestedMonitorInterval > _bondMonitorInterval) ? suggestedMonitorInterval : _bondMonitorInterval));
  728. }
  729. // TODO: Clarify and generalize this logic
  730. if (_linkMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC) {
  731. _shouldCollectPathStatistics = true;
  732. }
  733. // Memoize oft-used properties in the packet ingress/egress logic path
  734. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  735. // Required for real-time balancing
  736. _shouldCollectPathStatistics = true;
  737. }
  738. if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
  739. if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_BETTER) {
  740. // Required for judging suitability of primary link after recovery
  741. _shouldCollectPathStatistics = true;
  742. }
  743. if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) {
  744. // Required for judging suitability of new candidate primary
  745. _shouldCollectPathStatistics = true;
  746. }
  747. }
  748. if ((now - _lastCheckUserPreferences) > 1000) {
  749. _lastCheckUserPreferences = now;
  750. applyUserPrefs();
  751. }
  752. curateBond(now, false);
  753. if ((now - _lastQualityEstimation) > _qualityEstimationInterval) {
  754. _lastQualityEstimation = now;
  755. estimatePathQuality(now);
  756. }
  757. dumpInfo(now);
  758. // Send QOS/ACK packets as needed
  759. if (_shouldCollectPathStatistics) {
  760. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  761. if (_paths[i] && _paths[i]->allowed()) {
  762. if (_paths[i]->needsToSendQoS(now, _qosSendInterval)) {
  763. sendQOS_MEASUREMENT(tPtr, _paths[i], _paths[i]->localSocket(), _paths[i]->address(), now);
  764. }
  765. if (_paths[i]->needsToSendAck(now, _ackSendInterval)) {
  766. sendACK(tPtr, _paths[i], _paths[i]->localSocket(), _paths[i]->address(), now);
  767. }
  768. }
  769. }
  770. }
  771. // Perform periodic background tasks unique to each bonding policy
  772. switch (_bondingPolicy) {
  773. case ZT_BONDING_POLICY_ACTIVE_BACKUP:
  774. processActiveBackupTasks(tPtr, now);
  775. break;
  776. case ZT_BONDING_POLICY_BROADCAST:
  777. break;
  778. case ZT_BONDING_POLICY_BALANCE_RR:
  779. case ZT_BONDING_POLICY_BALANCE_XOR:
  780. case ZT_BONDING_POLICY_BALANCE_AWARE:
  781. processBalanceTasks(now);
  782. break;
  783. default:
  784. break;
  785. }
  786. // Check whether or not a path negotiation needs to be performed
  787. if (((now - _lastPathNegotiationCheck) > ZT_PATH_NEGOTIATION_CHECK_INTERVAL) && _allowPathNegotiation) {
  788. _lastPathNegotiationCheck = now;
  789. pathNegotiationCheck(tPtr, now);
  790. }
  791. }
  792. void Bond::applyUserPrefs()
  793. {
  794. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  795. if (! _paths[i]) {
  796. continue;
  797. }
  798. SharedPtr<Link> sl = getLink(_paths[i]);
  799. if (sl) {
  800. if (sl->monitorInterval() == 0) { // If no interval was specified for this link, use more generic bond-wide interval
  801. sl->setMonitorInterval(_bondMonitorInterval);
  802. }
  803. RR->bc->setMinReqPathMonitorInterval((sl->monitorInterval() < RR->bc->minReqPathMonitorInterval()) ? sl->monitorInterval() : RR->bc->minReqPathMonitorInterval());
  804. bool bFoundCommonLink = false;
  805. SharedPtr<Link> commonLink = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  806. for (unsigned int j = 0; j < ZT_MAX_PEER_NETWORK_PATHS; ++j) {
  807. if (_paths[j] && _paths[j].ptr() != _paths[i].ptr()) {
  808. if (RR->bc->getLinkBySocket(_policyAlias, _paths[j]->localSocket()) == commonLink) {
  809. bFoundCommonLink = true;
  810. }
  811. }
  812. }
  813. _paths[i]->_monitorInterval = sl->monitorInterval();
  814. _paths[i]->_upDelay = sl->upDelay() ? sl->upDelay() : _upDelay;
  815. _paths[i]->_downDelay = sl->downDelay() ? sl->downDelay() : _downDelay;
  816. _paths[i]->_ipvPref = sl->ipvPref();
  817. _paths[i]->_mode = sl->mode();
  818. _paths[i]->_enabled = sl->enabled();
  819. _paths[i]->_onlyPathOnLink = ! bFoundCommonLink;
  820. }
  821. }
  822. if (_peer) {
  823. _peer->_shouldCollectPathStatistics = _shouldCollectPathStatistics;
  824. _peer->_bondingPolicy = _bondingPolicy;
  825. }
  826. }
  827. void Bond::curateBond(const int64_t now, bool rebuildBond)
  828. {
  829. char traceMsg[256];
  830. char pathStr[128];
  831. uint8_t tmpNumAliveLinks = 0;
  832. uint8_t tmpNumTotalLinks = 0;
  833. /**
  834. * Update path states
  835. */
  836. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  837. if (! _paths[i]) {
  838. continue;
  839. }
  840. tmpNumTotalLinks++;
  841. if (_paths[i]->alive(now, true)) {
  842. tmpNumAliveLinks++;
  843. }
  844. bool currEligibility = _paths[i]->eligible(now, _ackSendInterval);
  845. if (currEligibility != _paths[i]->_lastEligibilityState) {
  846. _paths[i]->address().toString(pathStr);
  847. char traceMsg[256];
  848. _paths[i]->address().toString(pathStr);
  849. sprintf(
  850. traceMsg,
  851. "%s (bond) Eligibility of link %s/%s to peer %llx has changed from %d to %d",
  852. OSUtils::humanReadableTimestamp().c_str(),
  853. getLink(_paths[i])->ifname().c_str(),
  854. pathStr,
  855. (unsigned long long)(_peer->_id.address().toInt()),
  856. _paths[i]->_lastEligibilityState,
  857. currEligibility);
  858. RR->t->bondStateMessage(NULL, traceMsg);
  859. if (currEligibility) {
  860. rebuildBond = true;
  861. }
  862. if (! currEligibility) {
  863. _paths[i]->adjustRefractoryPeriod(now, _defaultPathRefractoryPeriod, ! currEligibility);
  864. if (_paths[i]->bonded()) {
  865. char pathStr[128];
  866. _paths[i]->address().toString(pathStr);
  867. sprintf(
  868. traceMsg,
  869. "%s (bond) Link %s/%s to peer %llx was bonded, reallocation of its flows will occur soon",
  870. OSUtils::humanReadableTimestamp().c_str(),
  871. getLink(_paths[i])->ifname().c_str(),
  872. pathStr,
  873. (unsigned long long)(_peer->_id.address().toInt()));
  874. RR->t->bondStateMessage(NULL, traceMsg);
  875. rebuildBond = true;
  876. _paths[i]->_shouldReallocateFlows = _paths[i]->bonded();
  877. _paths[i]->setBonded(false);
  878. }
  879. else {
  880. sprintf(
  881. traceMsg,
  882. "%s (bond) Link %s/%s to peer %llx was not bonded, no allocation consequences",
  883. OSUtils::humanReadableTimestamp().c_str(),
  884. getLink(_paths[i])->ifname().c_str(),
  885. pathStr,
  886. (unsigned long long)(_peer->_id.address().toInt()));
  887. RR->t->bondStateMessage(NULL, traceMsg);
  888. }
  889. }
  890. }
  891. if (currEligibility) {
  892. _paths[i]->adjustRefractoryPeriod(now, _defaultPathRefractoryPeriod, false);
  893. }
  894. _paths[i]->_lastEligibilityState = currEligibility;
  895. }
  896. _numAliveLinks = tmpNumAliveLinks;
  897. _numTotalLinks = tmpNumTotalLinks;
  898. /* Determine health status to report to user */
  899. bool tmpHealthStatus = true;
  900. if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
  901. if (_numAliveLinks < 2) {
  902. // Considered healthy if there is at least one failover link
  903. tmpHealthStatus = false;
  904. }
  905. }
  906. if (_bondingPolicy == ZT_BONDING_POLICY_BROADCAST) {
  907. if (_numAliveLinks < 1) {
  908. // Considered healthy if we're able to send frames at all
  909. tmpHealthStatus = false;
  910. }
  911. }
  912. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
  913. if (_numAliveLinks < _numTotalLinks) {
  914. // Considered healthy if all known paths are alive, this should be refined to account for user bond config settings
  915. tmpHealthStatus = false;
  916. }
  917. }
  918. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) {
  919. if (_numAliveLinks < _numTotalLinks) {
  920. // Considered healthy if all known paths are alive, this should be refined to account for user bond config settings
  921. tmpHealthStatus = false;
  922. }
  923. }
  924. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  925. if (_numAliveLinks < _numTotalLinks) {
  926. // Considered healthy if all known paths are alive, this should be refined to account for user bond config settings
  927. tmpHealthStatus = false;
  928. }
  929. }
  930. if (tmpHealthStatus != _isHealthy) {
  931. std::string healthStatusStr;
  932. if (tmpHealthStatus == true) {
  933. healthStatusStr = "HEALTHY";
  934. }
  935. else {
  936. healthStatusStr = "DEGRADED";
  937. }
  938. sprintf(traceMsg, "%s (bond) Bond to peer %llx is in a %s state (%d/%d links)", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), healthStatusStr.c_str(), _numAliveLinks, _numTotalLinks);
  939. RR->t->bondStateMessage(NULL, traceMsg);
  940. }
  941. _isHealthy = tmpHealthStatus;
  942. /**
  943. * Curate the set of paths that are part of the bond proper. Selects a single path
  944. * per logical link according to eligibility and user-specified constraints.
  945. */
  946. if ((_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) || (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) || (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE)) {
  947. if (! _numBondedPaths) {
  948. rebuildBond = true;
  949. }
  950. // TODO: Optimize
  951. if (rebuildBond) {
  952. int updatedBondedPathCount = 0;
  953. std::map<SharedPtr<Link>, int> linkMap;
  954. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  955. if (_paths[i] && _paths[i]->allowed() && (_paths[i]->eligible(now, _ackSendInterval) || ! _numBondedPaths)) {
  956. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  957. if (! linkMap.count(link)) {
  958. linkMap[link] = i;
  959. }
  960. else {
  961. bool overriden = false;
  962. _paths[i]->address().toString(pathStr);
  963. // fprintf(stderr, " link representative path already exists! (%s %s)\n", getLink(_paths[i])->ifname().c_str(), pathStr);
  964. if (_paths[i]->preferred() && ! _paths[linkMap[link]]->preferred()) {
  965. // Override previous choice if preferred
  966. if (_paths[linkMap[link]]->_assignedFlowCount) {
  967. _paths[linkMap[link]]->_deprecated = true;
  968. }
  969. else {
  970. _paths[linkMap[link]]->_deprecated = true;
  971. _paths[linkMap[link]]->setBonded(false);
  972. }
  973. linkMap[link] = i;
  974. overriden = true;
  975. }
  976. if ((_paths[i]->preferred() && _paths[linkMap[link]]->preferred()) || (! _paths[i]->preferred() && ! _paths[linkMap[link]]->preferred())) {
  977. if (_paths[i]->preferenceRank() > _paths[linkMap[link]]->preferenceRank()) {
  978. // Override if higher preference
  979. if (_paths[linkMap[link]]->_assignedFlowCount) {
  980. _paths[linkMap[link]]->_deprecated = true;
  981. }
  982. else {
  983. _paths[linkMap[link]]->_deprecated = true;
  984. _paths[linkMap[link]]->setBonded(false);
  985. }
  986. linkMap[link] = i;
  987. }
  988. }
  989. }
  990. }
  991. }
  992. std::map<SharedPtr<Link>, int>::iterator it = linkMap.begin();
  993. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  994. if (! _paths[i]) {
  995. continue;
  996. }
  997. _bondedIdx[i] = ZT_MAX_PEER_NETWORK_PATHS;
  998. if (it != linkMap.end()) {
  999. _bondedIdx[i] = it->second;
  1000. _paths[_bondedIdx[i]]->setBonded(true);
  1001. ++it;
  1002. ++updatedBondedPathCount;
  1003. _paths[_bondedIdx[i]]->address().toString(pathStr);
  1004. // fprintf(stderr, "setting i=%d, _bondedIdx[%d]=%d to bonded (%s %s)\n", i, i, _bondedIdx[i], getLink(_paths[_bondedIdx[i]])->ifname().c_str(), pathStr);
  1005. }
  1006. }
  1007. _numBondedPaths = updatedBondedPathCount;
  1008. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
  1009. // Cause a RR reset since the currently used index might no longer be valid
  1010. _rrPacketsSentOnCurrLink = _packetsPerLink;
  1011. }
  1012. }
  1013. }
  1014. }
  1015. void Bond::estimatePathQuality(const int64_t now)
  1016. {
  1017. uint32_t totUserSpecifiedLinkSpeed = 0;
  1018. if (_numBondedPaths) { // Compute relative user-specified speeds of links
  1019. for (unsigned int i = 0; i < _numBondedPaths; ++i) {
  1020. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1021. if (_paths[i] && _paths[i]->allowed()) {
  1022. totUserSpecifiedLinkSpeed += link->speed();
  1023. }
  1024. }
  1025. for (unsigned int i = 0; i < _numBondedPaths; ++i) {
  1026. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1027. if (_paths[i] && _paths[i]->allowed()) {
  1028. link->setRelativeSpeed((uint8_t)round(((float)link->speed() / (float)totUserSpecifiedLinkSpeed) * 255));
  1029. }
  1030. }
  1031. }
  1032. float lat[ZT_MAX_PEER_NETWORK_PATHS];
  1033. float pdv[ZT_MAX_PEER_NETWORK_PATHS];
  1034. float plr[ZT_MAX_PEER_NETWORK_PATHS];
  1035. float per[ZT_MAX_PEER_NETWORK_PATHS];
  1036. float maxLAT = 0;
  1037. float maxPDV = 0;
  1038. float maxPLR = 0;
  1039. float maxPER = 0;
  1040. float quality[ZT_MAX_PEER_NETWORK_PATHS];
  1041. uint8_t alloc[ZT_MAX_PEER_NETWORK_PATHS];
  1042. float totQuality = 0.0f;
  1043. memset(&lat, 0, sizeof(lat));
  1044. memset(&pdv, 0, sizeof(pdv));
  1045. memset(&plr, 0, sizeof(plr));
  1046. memset(&per, 0, sizeof(per));
  1047. memset(&quality, 0, sizeof(quality));
  1048. memset(&alloc, 0, sizeof(alloc));
  1049. // Compute initial summary statistics
  1050. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1051. if (! _paths[i] || ! _paths[i]->allowed()) {
  1052. continue;
  1053. }
  1054. // Compute/Smooth average of real-world observations
  1055. _paths[i]->_latencyMean = _paths[i]->latencySamples.mean();
  1056. _paths[i]->_latencyVariance = _paths[i]->latencySamples.stddev();
  1057. _paths[i]->_packetErrorRatio = 1.0 - (_paths[i]->packetValiditySamples.count() ? _paths[i]->packetValiditySamples.mean() : 1.0);
  1058. if (userHasSpecifiedLinkSpeeds()) {
  1059. // Use user-reported metrics
  1060. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1061. if (link) {
  1062. _paths[i]->_throughputMean = link->speed();
  1063. _paths[i]->_throughputVariance = 0;
  1064. }
  1065. }
  1066. // Drain unacknowledged QoS records
  1067. std::map<uint64_t, uint64_t>::iterator it = _paths[i]->qosStatsOut.begin();
  1068. uint64_t currentLostRecords = 0;
  1069. while (it != _paths[i]->qosStatsOut.end()) {
  1070. int qosRecordTimeout = 5000; //_paths[i]->monitorInterval() * ZT_MULTIPATH_QOS_ACK_INTERVAL_MULTIPLIER * 8;
  1071. if ((now - it->second) >= qosRecordTimeout) {
  1072. // Packet was lost
  1073. it = _paths[i]->qosStatsOut.erase(it);
  1074. ++currentLostRecords;
  1075. }
  1076. else {
  1077. ++it;
  1078. }
  1079. }
  1080. quality[i] = 0;
  1081. totQuality = 0;
  1082. // Normalize raw observations according to sane limits and/or user specified values
  1083. lat[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_latencyMean, 0, _maxAcceptableLatency, 0, 1));
  1084. pdv[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_latencyVariance, 0, _maxAcceptablePacketDelayVariance, 0, 1));
  1085. plr[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_packetLossRatio, 0, _maxAcceptablePacketLossRatio, 0, 1));
  1086. per[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_packetErrorRatio, 0, _maxAcceptablePacketErrorRatio, 0, 1));
  1087. // Record bond-wide maximums to determine relative values
  1088. maxLAT = lat[i] > maxLAT ? lat[i] : maxLAT;
  1089. maxPDV = pdv[i] > maxPDV ? pdv[i] : maxPDV;
  1090. maxPLR = plr[i] > maxPLR ? plr[i] : maxPLR;
  1091. maxPER = per[i] > maxPER ? per[i] : maxPER;
  1092. }
  1093. // Convert metrics to relative quantities and apply contribution weights
  1094. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1095. if (_paths[i] && _paths[i]->bonded()) {
  1096. quality[i] += ((maxLAT > 0.0f ? lat[i] / maxLAT : 0.0f) * _qualityWeights[ZT_QOS_LAT_IDX]);
  1097. quality[i] += ((maxPDV > 0.0f ? pdv[i] / maxPDV : 0.0f) * _qualityWeights[ZT_QOS_PDV_IDX]);
  1098. quality[i] += ((maxPLR > 0.0f ? plr[i] / maxPLR : 0.0f) * _qualityWeights[ZT_QOS_PLR_IDX]);
  1099. quality[i] += ((maxPER > 0.0f ? per[i] / maxPER : 0.0f) * _qualityWeights[ZT_QOS_PER_IDX]);
  1100. totQuality += quality[i];
  1101. }
  1102. }
  1103. // Normalize to 8-bit allocation values
  1104. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1105. if (_paths[i] && _paths[i]->bonded()) {
  1106. alloc[i] = (uint8_t)(std::ceil((quality[i] / totQuality) * (float)255));
  1107. _paths[i]->_allocation = alloc[i];
  1108. }
  1109. }
  1110. }
  1111. void Bond::processBalanceTasks(const int64_t now)
  1112. {
  1113. char curPathStr[128];
  1114. // TODO: Generalize
  1115. int totalAllocation = 0;
  1116. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1117. if (! _paths[i]) {
  1118. continue;
  1119. }
  1120. if (_paths[i] && _paths[i]->bonded() && _paths[i]->eligible(now, _ackSendInterval)) {
  1121. totalAllocation += _paths[i]->_allocation;
  1122. }
  1123. }
  1124. unsigned char minimumAllocationValue = (uint8_t)(0.33 * ((float)totalAllocation / (float)_numBondedPaths));
  1125. if (_allowFlowHashing) {
  1126. /**
  1127. * Clean up and reset flows if necessary
  1128. */
  1129. if ((now - _lastFlowExpirationCheck) > ZT_MULTIPATH_FLOW_CHECK_INTERVAL) {
  1130. Mutex::Lock _l(_flows_m);
  1131. forgetFlowsWhenNecessary(ZT_MULTIPATH_FLOW_EXPIRATION_INTERVAL, false, now);
  1132. _lastFlowExpirationCheck = now;
  1133. }
  1134. if ((now - _lastFlowStatReset) > ZT_FLOW_STATS_RESET_INTERVAL) {
  1135. Mutex::Lock _l(_flows_m);
  1136. _lastFlowStatReset = now;
  1137. std::map<int32_t, SharedPtr<Flow> >::iterator it = _flows.begin();
  1138. while (it != _flows.end()) {
  1139. it->second->resetByteCounts();
  1140. ++it;
  1141. }
  1142. }
  1143. /**
  1144. * Re-allocate flows from dead paths
  1145. */
  1146. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  1147. Mutex::Lock _l(_flows_m);
  1148. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1149. if (! _paths[i]) {
  1150. continue;
  1151. }
  1152. if (! _paths[i]->eligible(now, _ackSendInterval) && _paths[i]->_shouldReallocateFlows) {
  1153. char traceMsg[256];
  1154. char pathStr[128];
  1155. _paths[i]->address().toString(pathStr);
  1156. sprintf(
  1157. traceMsg,
  1158. "%s (balance-*) Reallocating flows to peer %llx from dead link %s/%s to surviving links",
  1159. OSUtils::humanReadableTimestamp().c_str(),
  1160. (unsigned long long)(_peer->_id.address().toInt()),
  1161. getLink(_paths[i])->ifname().c_str(),
  1162. pathStr);
  1163. RR->t->bondStateMessage(NULL, traceMsg);
  1164. std::map<int32_t, SharedPtr<Flow> >::iterator flow_it = _flows.begin();
  1165. while (flow_it != _flows.end()) {
  1166. if (flow_it->second->assignedPath() == _paths[i]) {
  1167. if (assignFlowToBondedPath(flow_it->second, now)) {
  1168. _paths[i]->_assignedFlowCount--;
  1169. }
  1170. }
  1171. ++flow_it;
  1172. }
  1173. _paths[i]->_shouldReallocateFlows = false;
  1174. }
  1175. }
  1176. }
  1177. /**
  1178. * Re-allocate flows from under-performing
  1179. * NOTE: This could be part of the above block but was kept separate for clarity.
  1180. */
  1181. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  1182. Mutex::Lock _l(_flows_m);
  1183. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1184. if (! _paths[i]) {
  1185. continue;
  1186. }
  1187. if (_paths[i] && _paths[i]->bonded() && _paths[i]->eligible(now, _ackSendInterval) && (_paths[i]->_allocation < minimumAllocationValue) && _paths[i]->_assignedFlowCount) {
  1188. _paths[i]->address().toString(curPathStr);
  1189. char traceMsg[256];
  1190. char pathStr[128];
  1191. _paths[i]->address().toString(pathStr);
  1192. sprintf(
  1193. traceMsg,
  1194. "%s (balance-*) Reallocating flows to peer %llx from under-performing link %s/%s\n",
  1195. OSUtils::humanReadableTimestamp().c_str(),
  1196. (unsigned long long)(_peer->_id.address().toInt()),
  1197. getLink(_paths[i])->ifname().c_str(),
  1198. pathStr);
  1199. RR->t->bondStateMessage(NULL, traceMsg);
  1200. std::map<int32_t, SharedPtr<Flow> >::iterator flow_it = _flows.begin();
  1201. while (flow_it != _flows.end()) {
  1202. if (flow_it->second->assignedPath() == _paths[i]) {
  1203. if (assignFlowToBondedPath(flow_it->second, now)) {
  1204. _paths[i]->_assignedFlowCount--;
  1205. }
  1206. }
  1207. ++flow_it;
  1208. }
  1209. _paths[i]->_shouldReallocateFlows = false;
  1210. }
  1211. }
  1212. }
  1213. }
  1214. /**
  1215. * Tasks specific to (Balance Round Robin)
  1216. */
  1217. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
  1218. // Nothing
  1219. }
  1220. /**
  1221. * Tasks specific to (Balance XOR)
  1222. */
  1223. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) {
  1224. // Nothing
  1225. }
  1226. /**
  1227. * Tasks specific to (Balance Aware)
  1228. */
  1229. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  1230. if (_allowFlowHashing) {
  1231. Mutex::Lock _l(_flows_m);
  1232. if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE) {
  1233. // Do nothing here, this is taken care of in the more general case above.
  1234. }
  1235. if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_OPPORTUNISTIC) {
  1236. // If the flow is temporarily inactive we should take this opportunity to re-assign the flow if needed.
  1237. }
  1238. if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_AGGRESSIVE) {
  1239. /**
  1240. * Return flows to the original path if it has once again become available
  1241. */
  1242. if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) {
  1243. std::map<int32_t, SharedPtr<Flow> >::iterator flow_it = _flows.begin();
  1244. while (flow_it != _flows.end()) {
  1245. if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval) && (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) {
  1246. // fprintf(stderr, "moving flow back onto its previous path assignment (based on eligibility)\n");
  1247. (flow_it->second->_assignedPath->_assignedFlowCount)--;
  1248. flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath, now);
  1249. (flow_it->second->_previouslyAssignedPath->_assignedFlowCount)++;
  1250. }
  1251. ++flow_it;
  1252. }
  1253. _lastFlowRebalance = now;
  1254. }
  1255. /**
  1256. * Return flows to the original path if it has once again become (performant)
  1257. */
  1258. if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) {
  1259. std::map<int32_t, SharedPtr<Flow> >::iterator flow_it = _flows.begin();
  1260. while (flow_it != _flows.end()) {
  1261. if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval) && (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) {
  1262. // fprintf(stderr, "moving flow back onto its previous path assignment (based on performance)\n");
  1263. (flow_it->second->_assignedPath->_assignedFlowCount)--;
  1264. flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath, now);
  1265. (flow_it->second->_previouslyAssignedPath->_assignedFlowCount)++;
  1266. }
  1267. ++flow_it;
  1268. }
  1269. _lastFlowRebalance = now;
  1270. }
  1271. }
  1272. }
  1273. else if (! _allowFlowHashing) {
  1274. // Nothing
  1275. }
  1276. }
  1277. }
  1278. void Bond::dequeueNextActiveBackupPath(const uint64_t now)
  1279. {
  1280. if (_abFailoverQueue.empty()) {
  1281. return;
  1282. }
  1283. _abPath = _abFailoverQueue.front();
  1284. _abFailoverQueue.pop_front();
  1285. _lastActiveBackupPathChange = now;
  1286. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1287. if (_paths[i]) {
  1288. _paths[i]->resetPacketCounts();
  1289. }
  1290. }
  1291. }
  1292. bool Bond::abForciblyRotateLink()
  1293. {
  1294. char traceMsg[256];
  1295. char prevPathStr[128];
  1296. char curPathStr[128];
  1297. if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
  1298. SharedPtr<Path> prevPath = _abPath;
  1299. _abPath->address().toString(prevPathStr);
  1300. dequeueNextActiveBackupPath(RR->node->now());
  1301. _abPath->address().toString(curPathStr);
  1302. sprintf(
  1303. traceMsg,
  1304. "%s (active-backup) Forcibly rotating peer %llx link from %s/%s to %s/%s",
  1305. OSUtils::humanReadableTimestamp().c_str(),
  1306. (unsigned long long)(_peer->_id.address().toInt()),
  1307. getLink(prevPath)->ifname().c_str(),
  1308. prevPathStr,
  1309. getLink(_abPath)->ifname().c_str(),
  1310. curPathStr);
  1311. RR->t->bondStateMessage(NULL, traceMsg);
  1312. return true;
  1313. }
  1314. return false;
  1315. }
  1316. void Bond::processActiveBackupTasks(void* tPtr, const int64_t now)
  1317. {
  1318. char traceMsg[256];
  1319. char pathStr[128];
  1320. char prevPathStr[128];
  1321. char curPathStr[128];
  1322. SharedPtr<Path> prevActiveBackupPath = _abPath;
  1323. SharedPtr<Path> nonPreferredPath;
  1324. bool bFoundPrimaryLink = false;
  1325. /**
  1326. * Generate periodic status report
  1327. */
  1328. if ((now - _lastBondStatusLog) > ZT_MULTIPATH_BOND_STATUS_INTERVAL) {
  1329. _lastBondStatusLog = now;
  1330. if (_abPath) {
  1331. _abPath->address().toString(curPathStr);
  1332. sprintf(
  1333. traceMsg,
  1334. "%s (active-backup) Active link to peer %llx is %s/%s, failover queue size is %zu",
  1335. OSUtils::humanReadableTimestamp().c_str(),
  1336. (unsigned long long)(_peer->_id.address().toInt()),
  1337. getLink(_abPath)->ifname().c_str(),
  1338. curPathStr,
  1339. _abFailoverQueue.size());
  1340. RR->t->bondStateMessage(NULL, traceMsg);
  1341. }
  1342. else {
  1343. sprintf(traceMsg, "%s (active-backup) No active link to peer %llx", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1344. RR->t->bondStateMessage(NULL, traceMsg);
  1345. }
  1346. if (_abFailoverQueue.empty()) {
  1347. sprintf(traceMsg, "%s (active-backup) Failover queue is empty, bond to peer %llx is NOT currently fault-tolerant", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1348. RR->t->bondStateMessage(NULL, traceMsg);
  1349. }
  1350. }
  1351. /**
  1352. * Select initial "active" active-backup link
  1353. */
  1354. if (! _abPath) {
  1355. /**
  1356. * [Automatic mode]
  1357. * The user has not explicitly specified links or their failover schedule,
  1358. * the bonding policy will now select the first eligible path and set it as
  1359. * its active backup path, if a substantially better path is detected the bonding
  1360. * policy will assign it as the new active backup path. If the path fails it will
  1361. * simply find the next eligible path.
  1362. */
  1363. if (! userHasSpecifiedLinks()) {
  1364. sprintf(traceMsg, "%s (active-backup) No links to peer %llx specified. Searching...", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1365. RR->t->bondStateMessage(NULL, traceMsg);
  1366. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1367. if (_paths[i] && _paths[i]->eligible(now, _ackSendInterval)) {
  1368. _paths[i]->address().toString(curPathStr);
  1369. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1370. if (link) {
  1371. sprintf(
  1372. traceMsg,
  1373. "%s (active-backup) Found eligible link %s/%s to peer %llx",
  1374. OSUtils::humanReadableTimestamp().c_str(),
  1375. getLink(_paths[i])->ifname().c_str(),
  1376. curPathStr,
  1377. (unsigned long long)(_peer->_id.address().toInt()));
  1378. RR->t->bondStateMessage(NULL, traceMsg);
  1379. }
  1380. _abPath = _paths[i];
  1381. break;
  1382. }
  1383. }
  1384. }
  1385. /**
  1386. * [Manual mode]
  1387. * The user has specified links or failover rules that the bonding policy should adhere to.
  1388. */
  1389. else if (userHasSpecifiedLinks()) {
  1390. if (userHasSpecifiedPrimaryLink()) {
  1391. // sprintf(traceMsg, "%s (active-backup) Checking local.conf for user-specified primary link\n", OSUtils::humanReadableTimestamp().c_str());
  1392. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1393. if (! _paths[i]) {
  1394. continue;
  1395. }
  1396. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1397. if (_paths[i]->eligible(now, _ackSendInterval) && link->primary()) {
  1398. if (! _paths[i]->preferred()) {
  1399. _paths[i]->address().toString(curPathStr);
  1400. // Found path on primary link, take note in case we don't find a preferred path
  1401. nonPreferredPath = _paths[i];
  1402. bFoundPrimaryLink = true;
  1403. }
  1404. if (_paths[i]->preferred()) {
  1405. _abPath = _paths[i];
  1406. _abPath->address().toString(curPathStr);
  1407. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1408. bFoundPrimaryLink = true;
  1409. break; // Found preferred path %s on primary link
  1410. }
  1411. }
  1412. }
  1413. if (_abPath) {
  1414. _abPath->address().toString(curPathStr);
  1415. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _abPath->localSocket());
  1416. if (link) {
  1417. sprintf(
  1418. traceMsg,
  1419. "%s (active-backup) Found preferred primary link %s/%s to peer %llx",
  1420. OSUtils::humanReadableTimestamp().c_str(),
  1421. getLink(_abPath)->ifname().c_str(),
  1422. curPathStr,
  1423. (unsigned long long)(_peer->_id.address().toInt()));
  1424. RR->t->bondStateMessage(NULL, traceMsg);
  1425. }
  1426. }
  1427. else {
  1428. if (bFoundPrimaryLink && nonPreferredPath) {
  1429. sprintf(traceMsg, "%s (active-backup) Found non-preferred primary link to peer %llx", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1430. RR->t->bondStateMessage(NULL, traceMsg);
  1431. _abPath = nonPreferredPath;
  1432. }
  1433. }
  1434. if (! _abPath) {
  1435. sprintf(traceMsg, "%s (active-backup) Designated primary link to peer %llx is not yet ready", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1436. RR->t->bondStateMessage(NULL, traceMsg);
  1437. // TODO: Should wait for some time (failover interval?) and then switch to spare link
  1438. }
  1439. }
  1440. else if (! userHasSpecifiedPrimaryLink()) {
  1441. int _abIdx = ZT_MAX_PEER_NETWORK_PATHS;
  1442. sprintf(traceMsg, "%s (active-backup) User did not specify a primary link to peer %llx, selecting first available link", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1443. RR->t->bondStateMessage(NULL, traceMsg);
  1444. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1445. if (_paths[i] && _paths[i]->eligible(now, _ackSendInterval)) {
  1446. _abIdx = i;
  1447. break;
  1448. }
  1449. }
  1450. if (_abIdx == ZT_MAX_PEER_NETWORK_PATHS) {
  1451. // Unable to find a candidate next-best, no change
  1452. }
  1453. else {
  1454. _abPath = _paths[_abIdx];
  1455. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _abPath->localSocket());
  1456. if (link) {
  1457. _abPath->address().toString(curPathStr);
  1458. sprintf(
  1459. traceMsg,
  1460. "%s (active-backup) Selected non-primary link %s/%s to peer %llx",
  1461. OSUtils::humanReadableTimestamp().c_str(),
  1462. getLink(_abPath)->ifname().c_str(),
  1463. curPathStr,
  1464. (unsigned long long)(_peer->_id.address().toInt()));
  1465. RR->t->bondStateMessage(NULL, traceMsg);
  1466. }
  1467. }
  1468. }
  1469. }
  1470. }
  1471. /**
  1472. * Update and maintain the active-backup failover queue
  1473. */
  1474. if (_abPath) {
  1475. // Don't worry about the failover queue until we have an active link
  1476. // Remove ineligible paths from the failover link queue
  1477. for (std::list<SharedPtr<Path> >::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end();) {
  1478. if ((*it) && ! (*it)->eligible(now, _ackSendInterval)) {
  1479. (*it)->address().toString(curPathStr);
  1480. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, (*it)->localSocket());
  1481. it = _abFailoverQueue.erase(it);
  1482. if (link) {
  1483. sprintf(
  1484. traceMsg,
  1485. "%s (active-backup) Link %s/%s to peer %llx is now ineligible, removing from failover queue, there are %zu links in the queue",
  1486. OSUtils::humanReadableTimestamp().c_str(),
  1487. getLink(_abPath)->ifname().c_str(),
  1488. curPathStr,
  1489. (unsigned long long)(_peer->_id.address().toInt()),
  1490. _abFailoverQueue.size());
  1491. RR->t->bondStateMessage(NULL, traceMsg);
  1492. }
  1493. }
  1494. else {
  1495. ++it;
  1496. }
  1497. }
  1498. /**
  1499. * Failover instructions were provided by user, build queue according those as well as IPv
  1500. * preference, disregarding performance.
  1501. */
  1502. if (userHasSpecifiedFailoverInstructions()) {
  1503. /**
  1504. * Clear failover scores
  1505. */
  1506. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1507. if (_paths[i]) {
  1508. _paths[i]->_failoverScore = 0;
  1509. }
  1510. }
  1511. // Follow user-specified failover instructions
  1512. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1513. if (! _paths[i] || ! _paths[i]->allowed() || ! _paths[i]->eligible(now, _ackSendInterval)) {
  1514. continue;
  1515. }
  1516. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1517. _paths[i]->address().toString(pathStr);
  1518. int failoverScoreHandicap = _paths[i]->_failoverScore;
  1519. if (_paths[i]->preferred()) {
  1520. failoverScoreHandicap += ZT_MULTIPATH_FAILOVER_HANDICAP_PREFERRED;
  1521. }
  1522. if (link->primary()) {
  1523. // If using "optimize" primary reselect mode, ignore user link designations
  1524. failoverScoreHandicap += ZT_MULTIPATH_FAILOVER_HANDICAP_PRIMARY;
  1525. }
  1526. if (! _paths[i]->_failoverScore) {
  1527. // If we didn't inherit a failover score from a "parent" that wants to use this path as a failover
  1528. int newHandicap = failoverScoreHandicap ? failoverScoreHandicap : _paths[i]->_allocation;
  1529. _paths[i]->_failoverScore = newHandicap;
  1530. }
  1531. SharedPtr<Link> failoverLink;
  1532. if (link->failoverToLink().length()) {
  1533. failoverLink = RR->bc->getLinkByName(_policyAlias, link->failoverToLink());
  1534. }
  1535. if (failoverLink) {
  1536. for (int j = 0; j < ZT_MAX_PEER_NETWORK_PATHS; j++) {
  1537. if (_paths[j] && getLink(_paths[j]) == failoverLink.ptr()) {
  1538. _paths[j]->address().toString(pathStr);
  1539. int inheritedHandicap = failoverScoreHandicap - 10;
  1540. int newHandicap = _paths[j]->_failoverScore > inheritedHandicap ? _paths[j]->_failoverScore : inheritedHandicap;
  1541. if (! _paths[j]->preferred()) {
  1542. newHandicap--;
  1543. }
  1544. _paths[j]->_failoverScore = newHandicap;
  1545. }
  1546. }
  1547. }
  1548. if (_paths[i].ptr() != _abPath.ptr()) {
  1549. bool bFoundPathInQueue = false;
  1550. for (std::list<SharedPtr<Path> >::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end(); ++it) {
  1551. if (_paths[i].ptr() == (*it).ptr()) {
  1552. bFoundPathInQueue = true;
  1553. }
  1554. }
  1555. if (! bFoundPathInQueue) {
  1556. _abFailoverQueue.push_front(_paths[i]);
  1557. _paths[i]->address().toString(curPathStr);
  1558. sprintf(
  1559. traceMsg,
  1560. "%s (active-backup) Added link %s/%s to peer %llx to failover queue, there are %zu links in the queue",
  1561. OSUtils::humanReadableTimestamp().c_str(),
  1562. getLink(_abPath)->ifname().c_str(),
  1563. curPathStr,
  1564. (unsigned long long)(_peer->_id.address().toInt()),
  1565. _abFailoverQueue.size());
  1566. RR->t->bondStateMessage(NULL, traceMsg);
  1567. }
  1568. }
  1569. }
  1570. }
  1571. /**
  1572. * No failover instructions provided by user, build queue according to performance
  1573. * and IPv preference.
  1574. */
  1575. else if (! userHasSpecifiedFailoverInstructions()) {
  1576. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1577. if (! _paths[i] || ! _paths[i]->allowed() || ! _paths[i]->eligible(now, _ackSendInterval)) {
  1578. continue;
  1579. }
  1580. int failoverScoreHandicap = 0;
  1581. if (_paths[i]->preferred()) {
  1582. failoverScoreHandicap = ZT_MULTIPATH_FAILOVER_HANDICAP_PREFERRED;
  1583. }
  1584. bool includeRefractoryPeriod = true;
  1585. if (! _paths[i]->eligible(now, includeRefractoryPeriod)) {
  1586. failoverScoreHandicap = -10000;
  1587. }
  1588. if (getLink(_paths[i])->primary() && _abLinkSelectMethod != ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) {
  1589. // If using "optimize" primary reselect mode, ignore user link designations
  1590. failoverScoreHandicap = ZT_MULTIPATH_FAILOVER_HANDICAP_PRIMARY;
  1591. }
  1592. if (_paths[i].ptr() == negotiatedPath.ptr()) {
  1593. _paths[i]->_negotiated = true;
  1594. failoverScoreHandicap = ZT_MULTIPATH_FAILOVER_HANDICAP_NEGOTIATED;
  1595. }
  1596. else {
  1597. _paths[i]->_negotiated = false;
  1598. }
  1599. _paths[i]->_failoverScore = _paths[i]->_allocation + failoverScoreHandicap;
  1600. if (_paths[i].ptr() != _abPath.ptr()) {
  1601. bool bFoundPathInQueue = false;
  1602. for (std::list<SharedPtr<Path> >::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end(); ++it) {
  1603. if (_paths[i].ptr() == (*it).ptr()) {
  1604. bFoundPathInQueue = true;
  1605. }
  1606. }
  1607. if (! bFoundPathInQueue) {
  1608. _abFailoverQueue.push_front(_paths[i]);
  1609. _paths[i]->address().toString(curPathStr);
  1610. sprintf(
  1611. traceMsg,
  1612. "%s (active-backup) Added link %s/%s to peer %llx to failover queue, there are %zu links in the queue",
  1613. OSUtils::humanReadableTimestamp().c_str(),
  1614. getLink(_paths[i])->ifname().c_str(),
  1615. curPathStr,
  1616. (unsigned long long)(_peer->_id.address().toInt()),
  1617. _abFailoverQueue.size());
  1618. RR->t->bondStateMessage(NULL, traceMsg);
  1619. }
  1620. }
  1621. }
  1622. }
  1623. _abFailoverQueue.sort(PathQualityComparator());
  1624. }
  1625. /**
  1626. * Short-circuit if we have no queued paths
  1627. */
  1628. if (_abFailoverQueue.empty()) {
  1629. return;
  1630. }
  1631. /**
  1632. * Fulfill primary reselect obligations
  1633. */
  1634. if (_abPath && ! _abPath->eligible(now, _ackSendInterval)) { // Implicit ZT_MULTIPATH_RESELECTION_POLICY_FAILURE
  1635. _abPath->address().toString(curPathStr);
  1636. sprintf(
  1637. traceMsg,
  1638. "%s (active-backup) Link %s/%s to peer %llx has failed. Selecting new link from failover queue, there are %zu links in the queue",
  1639. OSUtils::humanReadableTimestamp().c_str(),
  1640. getLink(_abPath)->ifname().c_str(),
  1641. curPathStr,
  1642. (unsigned long long)(_peer->_id.address().toInt()),
  1643. _abFailoverQueue.size());
  1644. RR->t->bondStateMessage(NULL, traceMsg);
  1645. if (! _abFailoverQueue.empty()) {
  1646. dequeueNextActiveBackupPath(now);
  1647. _abPath->address().toString(curPathStr);
  1648. sprintf(
  1649. traceMsg,
  1650. "%s (active-backup) Active link to peer %llx has been switched to %s/%s",
  1651. OSUtils::humanReadableTimestamp().c_str(),
  1652. (unsigned long long)(_peer->_id.address().toInt()),
  1653. getLink(_abPath)->ifname().c_str(),
  1654. curPathStr);
  1655. RR->t->bondStateMessage(NULL, traceMsg);
  1656. }
  1657. else {
  1658. sprintf(traceMsg, "%s (active-backup) Failover queue is empty. No links to peer %llx to choose from", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1659. RR->t->bondStateMessage(NULL, traceMsg);
  1660. }
  1661. }
  1662. /**
  1663. * Detect change to prevent flopping during later optimization step.
  1664. */
  1665. if (prevActiveBackupPath != _abPath) {
  1666. _lastActiveBackupPathChange = now;
  1667. }
  1668. if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_ALWAYS) {
  1669. if (_abPath && ! getLink(_abPath)->primary() && getLink(_abFailoverQueue.front())->primary()) {
  1670. dequeueNextActiveBackupPath(now);
  1671. _abPath->address().toString(curPathStr);
  1672. sprintf(
  1673. traceMsg,
  1674. "%s (active-backup) Switching back to available primary link %s/%s to peer %llx [linkSelectionMethod = always]",
  1675. OSUtils::humanReadableTimestamp().c_str(),
  1676. getLink(_abPath)->ifname().c_str(),
  1677. curPathStr,
  1678. (unsigned long long)(_peer->_id.address().toInt()));
  1679. RR->t->bondStateMessage(NULL, traceMsg);
  1680. }
  1681. }
  1682. if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_BETTER) {
  1683. if (_abPath && ! getLink(_abPath)->primary()) {
  1684. // Active backup has switched to "better" primary link according to re-select policy.
  1685. if (getLink(_abFailoverQueue.front())->primary() && (_abFailoverQueue.front()->_failoverScore > _abPath->_failoverScore)) {
  1686. dequeueNextActiveBackupPath(now);
  1687. _abPath->address().toString(curPathStr);
  1688. sprintf(
  1689. traceMsg,
  1690. "%s (active-backup) Switching back to user-defined primary link %s/%s to peer %llx [linkSelectionMethod = better]",
  1691. OSUtils::humanReadableTimestamp().c_str(),
  1692. getLink(_abPath)->ifname().c_str(),
  1693. curPathStr,
  1694. (unsigned long long)(_peer->_id.address().toInt()));
  1695. RR->t->bondStateMessage(NULL, traceMsg);
  1696. }
  1697. }
  1698. }
  1699. if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE && ! _abFailoverQueue.empty()) {
  1700. /**
  1701. * Implement link negotiation that was previously-decided
  1702. */
  1703. if (_abFailoverQueue.front()->_negotiated) {
  1704. dequeueNextActiveBackupPath(now);
  1705. _abPath->address().toString(prevPathStr);
  1706. _lastPathNegotiationCheck = now;
  1707. _abPath->address().toString(curPathStr);
  1708. sprintf(
  1709. traceMsg,
  1710. "%s (active-backup) Switching negotiated link %s/%s to peer %llx [linkSelectionMethod = optimize]",
  1711. OSUtils::humanReadableTimestamp().c_str(),
  1712. getLink(_abPath)->ifname().c_str(),
  1713. curPathStr,
  1714. (unsigned long long)(_peer->_id.address().toInt()));
  1715. RR->t->bondStateMessage(NULL, traceMsg);
  1716. }
  1717. else {
  1718. // Try to find a better path and automatically switch to it -- not too often, though.
  1719. if ((now - _lastActiveBackupPathChange) > ZT_MULTIPATH_MIN_ACTIVE_BACKUP_AUTOFLOP_INTERVAL) {
  1720. if (! _abFailoverQueue.empty()) {
  1721. int newFScore = _abFailoverQueue.front()->_failoverScore;
  1722. int prevFScore = _abPath->_failoverScore;
  1723. // Establish a minimum switch threshold to prevent flapping
  1724. int failoverScoreDifference = _abFailoverQueue.front()->_failoverScore - _abPath->_failoverScore;
  1725. int thresholdQuantity = (int)(ZT_MULTIPATH_ACTIVE_BACKUP_OPTIMIZE_MIN_THRESHOLD * (float)_abPath->_allocation);
  1726. if ((failoverScoreDifference > 0) && (failoverScoreDifference > thresholdQuantity)) {
  1727. SharedPtr<Path> oldPath = _abPath;
  1728. _abPath->address().toString(prevPathStr);
  1729. dequeueNextActiveBackupPath(now);
  1730. _abPath->address().toString(curPathStr);
  1731. sprintf(
  1732. traceMsg,
  1733. "%s (active-backup) Switching from %s/%s (fscore=%d) to better link %s/%s (fscore=%d) for peer %llx [linkSelectionMethod = optimize]",
  1734. OSUtils::humanReadableTimestamp().c_str(),
  1735. getLink(oldPath)->ifname().c_str(),
  1736. prevPathStr,
  1737. prevFScore,
  1738. getLink(_abPath)->ifname().c_str(),
  1739. curPathStr,
  1740. newFScore,
  1741. (unsigned long long)(_peer->_id.address().toInt()));
  1742. RR->t->bondStateMessage(NULL, traceMsg);
  1743. }
  1744. }
  1745. }
  1746. }
  1747. }
  1748. }
  1749. void Bond::setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool useTemplate)
  1750. {
  1751. // If invalid bonding policy, try default
  1752. int _defaultBondingPolicy = BondController::defaultBondingPolicy();
  1753. if (policy <= ZT_BONDING_POLICY_NONE || policy > ZT_BONDING_POLICY_BALANCE_AWARE) {
  1754. // If no default set, use NONE (effectively disabling this bond)
  1755. if (_defaultBondingPolicy < ZT_BONDING_POLICY_NONE || _defaultBondingPolicy > ZT_BONDING_POLICY_BALANCE_AWARE) {
  1756. _bondingPolicy = ZT_BONDING_POLICY_NONE;
  1757. }
  1758. _bondingPolicy = _defaultBondingPolicy;
  1759. }
  1760. else {
  1761. _bondingPolicy = policy;
  1762. }
  1763. _freeRandomByte = 0;
  1764. _userHasSpecifiedPrimaryLink = false;
  1765. _userHasSpecifiedFailoverInstructions = false;
  1766. _isHealthy = false;
  1767. _numAliveLinks = 0;
  1768. _numTotalLinks = 0;
  1769. _downDelay = 0;
  1770. _upDelay = 0;
  1771. _allowFlowHashing = false;
  1772. _bondMonitorInterval = 0;
  1773. _shouldCollectPathStatistics = false;
  1774. // Path negotiation
  1775. _allowPathNegotiation = false;
  1776. _pathNegotiationCutoffCount = 0;
  1777. _localUtility = 0;
  1778. _numBondedPaths = 0;
  1779. _rrPacketsSentOnCurrLink = 0;
  1780. _rrIdx = 0;
  1781. _totalBondUnderload = 0;
  1782. _maxAcceptableLatency = 100;
  1783. _maxAcceptablePacketDelayVariance = 50;
  1784. _maxAcceptablePacketLossRatio = 0.10f;
  1785. _maxAcceptablePacketErrorRatio = 0.10f;
  1786. _userHasSpecifiedLinkSpeeds = 0;
  1787. /* ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE is the most conservative strategy and is
  1788. least likely to cause unexpected behavior */
  1789. _flowRebalanceStrategy = ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_AGGRESSIVE;
  1790. /**
  1791. * Paths are actively monitored to provide a real-time quality/preference-ordered rapid failover queue.
  1792. */
  1793. switch (policy) {
  1794. case ZT_BONDING_POLICY_ACTIVE_BACKUP:
  1795. _failoverInterval = 500;
  1796. _abLinkSelectMethod = ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE;
  1797. _linkMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
  1798. _qualityWeights[ZT_QOS_LAT_IDX] = 0.2f;
  1799. _qualityWeights[ZT_QOS_LTM_IDX] = 0.0f;
  1800. _qualityWeights[ZT_QOS_PDV_IDX] = 0.2f;
  1801. _qualityWeights[ZT_QOS_PLR_IDX] = 0.2f;
  1802. _qualityWeights[ZT_QOS_PER_IDX] = 0.2f;
  1803. _qualityWeights[ZT_QOS_THR_IDX] = 0.2f;
  1804. _qualityWeights[ZT_QOS_THM_IDX] = 0.0f;
  1805. _qualityWeights[ZT_QOS_THV_IDX] = 0.0f;
  1806. _qualityWeights[ZT_QOS_SCP_IDX] = 0.0f;
  1807. break;
  1808. /**
  1809. * All seemingly-alive paths are used. Paths are not actively monitored.
  1810. */
  1811. case ZT_BONDING_POLICY_BROADCAST:
  1812. _downDelay = 30000;
  1813. _upDelay = 0;
  1814. break;
  1815. /**
  1816. * Paths are monitored to determine when/if one needs to be added or removed from the rotation
  1817. */
  1818. case ZT_BONDING_POLICY_BALANCE_RR:
  1819. _failoverInterval = 3000;
  1820. _allowFlowHashing = false;
  1821. _packetsPerLink = 1024;
  1822. _linkMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
  1823. _qualityWeights[ZT_QOS_LAT_IDX] = 0.4f;
  1824. _qualityWeights[ZT_QOS_LTM_IDX] = 0.0f;
  1825. _qualityWeights[ZT_QOS_PDV_IDX] = 0.2f;
  1826. _qualityWeights[ZT_QOS_PLR_IDX] = 0.1f;
  1827. _qualityWeights[ZT_QOS_PER_IDX] = 0.1f;
  1828. _qualityWeights[ZT_QOS_THR_IDX] = 0.1f;
  1829. _qualityWeights[ZT_QOS_THM_IDX] = 0.0f;
  1830. _qualityWeights[ZT_QOS_THV_IDX] = 0.0f;
  1831. _qualityWeights[ZT_QOS_SCP_IDX] = 0.0f;
  1832. break;
  1833. /**
  1834. * Path monitoring is used to determine the capacity of each
  1835. * path and where to place the next flow.
  1836. */
  1837. case ZT_BONDING_POLICY_BALANCE_XOR:
  1838. _failoverInterval = 3000;
  1839. _upDelay = _bondMonitorInterval * 2;
  1840. _allowFlowHashing = true;
  1841. _linkMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
  1842. _qualityWeights[ZT_QOS_LAT_IDX] = 0.4f;
  1843. _qualityWeights[ZT_QOS_LTM_IDX] = 0.0f;
  1844. _qualityWeights[ZT_QOS_PDV_IDX] = 0.2f;
  1845. _qualityWeights[ZT_QOS_PLR_IDX] = 0.1f;
  1846. _qualityWeights[ZT_QOS_PER_IDX] = 0.1f;
  1847. _qualityWeights[ZT_QOS_THR_IDX] = 0.1f;
  1848. _qualityWeights[ZT_QOS_THM_IDX] = 0.0f;
  1849. _qualityWeights[ZT_QOS_THV_IDX] = 0.0f;
  1850. _qualityWeights[ZT_QOS_SCP_IDX] = 0.0f;
  1851. break;
  1852. /**
  1853. * Path monitoring is used to determine the capacity of each
  1854. * path and where to place the next flow. Additionally, re-shuffling
  1855. * of flows may take place.
  1856. */
  1857. case ZT_BONDING_POLICY_BALANCE_AWARE:
  1858. _failoverInterval = 3000;
  1859. _allowFlowHashing = true;
  1860. _linkMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
  1861. _qualityWeights[ZT_QOS_LAT_IDX] = 0.4f;
  1862. _qualityWeights[ZT_QOS_LTM_IDX] = 0.0f;
  1863. _qualityWeights[ZT_QOS_PDV_IDX] = 0.4f;
  1864. _qualityWeights[ZT_QOS_PLR_IDX] = 0.2f;
  1865. _qualityWeights[ZT_QOS_PER_IDX] = 0.0f;
  1866. _qualityWeights[ZT_QOS_THR_IDX] = 0.0f;
  1867. _qualityWeights[ZT_QOS_THM_IDX] = 0.0f;
  1868. _qualityWeights[ZT_QOS_THV_IDX] = 0.0f;
  1869. _qualityWeights[ZT_QOS_SCP_IDX] = 0.0f;
  1870. break;
  1871. default:
  1872. break;
  1873. }
  1874. /* If a user has specified custom parameters for this bonding policy, overlay
  1875. them onto the defaults that were previously set */
  1876. if (useTemplate) {
  1877. _policyAlias = templateBond->_policyAlias;
  1878. _failoverInterval = templateBond->_failoverInterval >= 250 ? templateBond->_failoverInterval : _failoverInterval;
  1879. _downDelay = templateBond->_downDelay;
  1880. _upDelay = templateBond->_upDelay;
  1881. if (templateBond->_linkMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_PASSIVE && templateBond->_failoverInterval != 0) {
  1882. // fprintf(stderr, "warning: passive path monitoring was specified, this will prevent failovers from happening in a timely manner.\n");
  1883. }
  1884. _abLinkSelectMethod = templateBond->_abLinkSelectMethod;
  1885. memcpy(_qualityWeights, templateBond->_qualityWeights, ZT_QOS_WEIGHT_SIZE * sizeof(float));
  1886. }
  1887. /* Set timer geometries */
  1888. _bondMonitorInterval = _failoverInterval / 3;
  1889. BondController::setMinReqPathMonitorInterval(_bondMonitorInterval);
  1890. _ackSendInterval = _failoverInterval;
  1891. _qualityEstimationInterval = _failoverInterval * 2;
  1892. _dynamicPathMonitorInterval = 0;
  1893. _ackCutoffCount = 0;
  1894. _qosSendInterval = _bondMonitorInterval * 4;
  1895. _qosCutoffCount = 0;
  1896. throughputMeasurementInterval = _ackSendInterval * 2;
  1897. _defaultPathRefractoryPeriod = 8000;
  1898. }
  1899. void Bond::setUserQualityWeights(float weights[], int len)
  1900. {
  1901. if (len == ZT_QOS_WEIGHT_SIZE) {
  1902. float weightTotal = 0.0;
  1903. for (unsigned int i = 0; i < ZT_QOS_WEIGHT_SIZE; ++i) {
  1904. weightTotal += weights[i];
  1905. }
  1906. if (weightTotal > 0.99 && weightTotal < 1.01) {
  1907. memcpy(_qualityWeights, weights, len * sizeof(float));
  1908. }
  1909. }
  1910. }
  1911. bool Bond::relevant()
  1912. {
  1913. return false;
  1914. }
  1915. SharedPtr<Link> Bond::getLink(const SharedPtr<Path>& path)
  1916. {
  1917. return RR->bc->getLinkBySocket(_policyAlias, path->localSocket());
  1918. }
  1919. void Bond::dumpInfo(const int64_t now)
  1920. {
  1921. // Omitted
  1922. }
  1923. } // namespace ZeroTier