relay_manager.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package nebula
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync/atomic"
  7. "github.com/sirupsen/logrus"
  8. "github.com/slackhq/nebula/config"
  9. "github.com/slackhq/nebula/header"
  10. "github.com/slackhq/nebula/iputil"
  11. )
  12. type relayManager struct {
  13. l *logrus.Logger
  14. hostmap *HostMap
  15. amRelay atomic.Bool
  16. }
  17. func NewRelayManager(ctx context.Context, l *logrus.Logger, hostmap *HostMap, c *config.C) *relayManager {
  18. rm := &relayManager{
  19. l: l,
  20. hostmap: hostmap,
  21. }
  22. rm.reload(c, true)
  23. c.RegisterReloadCallback(func(c *config.C) {
  24. err := rm.reload(c, false)
  25. if err != nil {
  26. l.WithError(err).Error("Failed to reload relay_manager")
  27. }
  28. })
  29. return rm
  30. }
  31. func (rm *relayManager) reload(c *config.C, initial bool) error {
  32. if initial || c.HasChanged("relay.am_relay") {
  33. rm.setAmRelay(c.GetBool("relay.am_relay", false))
  34. }
  35. return nil
  36. }
  37. func (rm *relayManager) GetAmRelay() bool {
  38. return rm.amRelay.Load()
  39. }
  40. func (rm *relayManager) setAmRelay(v bool) {
  41. rm.amRelay.Store(v)
  42. }
  43. // AddRelay finds an available relay index on the hostmap, and associates the relay info with it.
  44. // relayHostInfo is the Nebula peer which can be used as a relay to access the target vpnIp.
  45. func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp iputil.VpnIp, remoteIdx *uint32, relayType int, state int) (uint32, error) {
  46. hm.Lock()
  47. defer hm.Unlock()
  48. for i := 0; i < 32; i++ {
  49. index, err := generateIndex(l)
  50. if err != nil {
  51. return 0, err
  52. }
  53. _, inRelays := hm.Relays[index]
  54. if !inRelays {
  55. // Avoid standing up a relay that can't be used since only the primary hostinfo
  56. // will be pointed to by the relay logic
  57. //TODO: if there was an existing primary and it had relay state, should we merge?
  58. hm.unlockedMakePrimary(relayHostInfo)
  59. hm.Relays[index] = relayHostInfo
  60. newRelay := Relay{
  61. Type: relayType,
  62. State: state,
  63. LocalIndex: index,
  64. PeerIp: vpnIp,
  65. }
  66. if remoteIdx != nil {
  67. newRelay.RemoteIndex = *remoteIdx
  68. }
  69. relayHostInfo.relayState.InsertRelay(vpnIp, index, &newRelay)
  70. return index, nil
  71. }
  72. }
  73. return 0, errors.New("failed to generate unique localIndexId")
  74. }
  75. // EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic.
  76. func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) {
  77. relay, ok := relayHostInfo.relayState.QueryRelayForByIdx(m.InitiatorRelayIndex)
  78. if !ok {
  79. rm.l.WithFields(logrus.Fields{"relayHostInfo": relayHostInfo.vpnIp,
  80. "initiatorRelayIndex": m.InitiatorRelayIndex,
  81. "relayFrom": m.RelayFromIp,
  82. "relayTo": m.RelayToIp}).Info("relayManager EstablishRelay relayForByIdx not found")
  83. return nil, fmt.Errorf("unknown relay")
  84. }
  85. // relay deserves some synchronization
  86. relay.RemoteIndex = m.ResponderRelayIndex
  87. relay.State = Established
  88. return relay, nil
  89. }
  90. func (rm *relayManager) HandleControlMsg(h *HostInfo, m *NebulaControl, f *Interface) {
  91. switch m.Type {
  92. case NebulaControl_CreateRelayRequest:
  93. rm.handleCreateRelayRequest(h, f, m)
  94. case NebulaControl_CreateRelayResponse:
  95. rm.handleCreateRelayResponse(h, f, m)
  96. }
  97. }
  98. func (rm *relayManager) handleCreateRelayResponse(h *HostInfo, f *Interface, m *NebulaControl) {
  99. rm.l.WithFields(logrus.Fields{
  100. "relayFrom": iputil.VpnIp(m.RelayFromIp),
  101. "relayTarget": iputil.VpnIp(m.RelayToIp),
  102. "initiatorIdx": m.InitiatorRelayIndex,
  103. "responderIdx": m.ResponderRelayIndex,
  104. "hostInfo": h.vpnIp}).
  105. Info("handleCreateRelayResponse")
  106. target := iputil.VpnIp(m.RelayToIp)
  107. relay, err := rm.EstablishRelay(h, m)
  108. if err != nil {
  109. rm.l.WithError(err).WithField("target", target.String()).Error("Failed to update relay for target")
  110. return
  111. }
  112. // Do I need to complete the relays now?
  113. if relay.Type == TerminalType {
  114. return
  115. }
  116. // I'm the middle man. Let the initiator know that the I've established the relay they requested.
  117. peerHostInfo, err := rm.hostmap.QueryVpnIp(relay.PeerIp)
  118. if err != nil {
  119. rm.l.WithError(err).WithField("relayPeerIp", relay.PeerIp).Error("Can't find a HostInfo for peer IP")
  120. return
  121. }
  122. peerRelay, ok := peerHostInfo.relayState.QueryRelayForByIp(target)
  123. if !ok {
  124. rm.l.WithField("peerIp", peerHostInfo.vpnIp).WithField("target", target.String()).Error("peerRelay does not have Relay state for target IP", peerHostInfo.vpnIp.String(), target.String())
  125. return
  126. }
  127. peerRelay.State = Established
  128. resp := NebulaControl{
  129. Type: NebulaControl_CreateRelayResponse,
  130. ResponderRelayIndex: peerRelay.LocalIndex,
  131. InitiatorRelayIndex: peerRelay.RemoteIndex,
  132. RelayFromIp: uint32(peerHostInfo.vpnIp),
  133. RelayToIp: uint32(target),
  134. }
  135. msg, err := resp.Marshal()
  136. if err != nil {
  137. rm.l.
  138. WithError(err).Error("relayManager Failed to marhsal Control CreateRelayResponse message to create relay")
  139. } else {
  140. f.SendMessageToVpnIp(header.Control, 0, peerHostInfo.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
  141. }
  142. }
  143. func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *NebulaControl) {
  144. rm.l.WithFields(logrus.Fields{
  145. "relayFrom": iputil.VpnIp(m.RelayFromIp),
  146. "relayTarget": iputil.VpnIp(m.RelayToIp),
  147. "initiatorIdx": m.InitiatorRelayIndex,
  148. "hostInfo": h.vpnIp}).
  149. Info("handleCreateRelayRequest")
  150. from := iputil.VpnIp(m.RelayFromIp)
  151. target := iputil.VpnIp(m.RelayToIp)
  152. // Is the target of the relay me?
  153. if target == f.myVpnIp {
  154. existingRelay, ok := h.relayState.QueryRelayForByIp(from)
  155. addRelay := !ok
  156. if ok {
  157. // Clean up existing relay, if this is a new request.
  158. if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
  159. // We got a brand new Relay request, because its index is different than what we saw before.
  160. // Clean up the existing Relay state, and get ready to record new Relay state.
  161. rm.hostmap.RemoveRelay(existingRelay.LocalIndex)
  162. addRelay = true
  163. }
  164. }
  165. if addRelay {
  166. _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
  167. if err != nil {
  168. return
  169. }
  170. }
  171. relay, ok := h.relayState.QueryRelayForByIp(from)
  172. if ok && m.InitiatorRelayIndex != relay.RemoteIndex {
  173. // Do something, Something happened.
  174. }
  175. resp := NebulaControl{
  176. Type: NebulaControl_CreateRelayResponse,
  177. ResponderRelayIndex: relay.LocalIndex,
  178. InitiatorRelayIndex: relay.RemoteIndex,
  179. RelayFromIp: uint32(from),
  180. RelayToIp: uint32(target),
  181. }
  182. msg, err := resp.Marshal()
  183. if err != nil {
  184. rm.l.
  185. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  186. } else {
  187. f.SendMessageToVpnIp(header.Control, 0, h.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
  188. }
  189. return
  190. } else {
  191. // the target is not me. Create a relay to the target, from me.
  192. if rm.GetAmRelay() == false {
  193. return
  194. }
  195. peer, err := rm.hostmap.QueryVpnIp(target)
  196. if err != nil {
  197. // Try to establish a connection to this host. If we get a future relay request,
  198. // we'll be ready!
  199. f.getOrHandshake(target)
  200. return
  201. }
  202. if peer.remote == nil {
  203. // Only create relays to peers for whom I have a direct connection
  204. return
  205. }
  206. sendCreateRequest := false
  207. var index uint32
  208. targetRelay, ok := peer.relayState.QueryRelayForByIp(from)
  209. if ok {
  210. index = targetRelay.LocalIndex
  211. if targetRelay.State == Requested {
  212. sendCreateRequest = true
  213. }
  214. } else {
  215. // Allocate an index in the hostMap for this relay peer
  216. index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested)
  217. if err != nil {
  218. return
  219. }
  220. sendCreateRequest = true
  221. }
  222. if sendCreateRequest {
  223. // Send a CreateRelayRequest to the peer.
  224. req := NebulaControl{
  225. Type: NebulaControl_CreateRelayRequest,
  226. InitiatorRelayIndex: index,
  227. RelayFromIp: uint32(h.vpnIp),
  228. RelayToIp: uint32(target),
  229. }
  230. msg, err := req.Marshal()
  231. if err != nil {
  232. rm.l.
  233. WithError(err).Error("relayManager Failed to marshal Control message to create relay")
  234. } else {
  235. f.SendMessageToVpnIp(header.Control, 0, target, msg, make([]byte, 12), make([]byte, mtu))
  236. }
  237. }
  238. // Also track the half-created Relay state just received
  239. relay, ok := h.relayState.QueryRelayForByIp(target)
  240. if !ok {
  241. // Add the relay
  242. state := Requested
  243. if targetRelay != nil && targetRelay.State == Established {
  244. state = Established
  245. }
  246. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, state)
  247. if err != nil {
  248. rm.l.
  249. WithError(err).Error("relayManager Failed to allocate a local index for relay")
  250. return
  251. }
  252. } else {
  253. if relay.RemoteIndex != m.InitiatorRelayIndex {
  254. // This is a stale Relay entry for the same tunnel targets.
  255. // Clean up the existing stuff.
  256. rm.RemoveRelay(relay.LocalIndex)
  257. // Add the new relay
  258. _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, Requested)
  259. if err != nil {
  260. return
  261. }
  262. relay, _ = h.relayState.QueryRelayForByIp(target)
  263. }
  264. switch relay.State {
  265. case Established:
  266. resp := NebulaControl{
  267. Type: NebulaControl_CreateRelayResponse,
  268. ResponderRelayIndex: relay.LocalIndex,
  269. InitiatorRelayIndex: relay.RemoteIndex,
  270. RelayFromIp: uint32(h.vpnIp),
  271. RelayToIp: uint32(target),
  272. }
  273. msg, err := resp.Marshal()
  274. if err != nil {
  275. rm.l.
  276. WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
  277. } else {
  278. f.SendMessageToVpnIp(header.Control, 0, h.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
  279. }
  280. case Requested:
  281. // Keep waiting for the other relay to complete
  282. }
  283. }
  284. }
  285. }
  286. func (rm *relayManager) RemoveRelay(localIdx uint32) {
  287. rm.hostmap.RemoveRelay(localIdx)
  288. }