kcp_misc.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package connections
  7. import (
  8. "bytes"
  9. "encoding/binary"
  10. "net"
  11. "sort"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/AudriusButkevicius/pfilter"
  16. "github.com/hashicorp/yamux"
  17. )
  18. var (
  19. mut sync.Mutex
  20. filters filterList
  21. )
  22. type filterList []*pfilter.PacketFilter
  23. // Sort connections by wether the are unspecified or not, as connections
  24. // listenin on all addresses are more useful.
  25. func (f filterList) Len() int { return len(f) }
  26. func (f filterList) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
  27. func (f filterList) Less(i, j int) bool {
  28. iIsUnspecified := false
  29. jIsUnspecified := false
  30. if host, _, err := net.SplitHostPort(f[i].LocalAddr().String()); err == nil {
  31. iIsUnspecified = net.ParseIP(host).IsUnspecified()
  32. }
  33. if host, _, err := net.SplitHostPort(f[j].LocalAddr().String()); err == nil {
  34. jIsUnspecified = net.ParseIP(host).IsUnspecified()
  35. }
  36. return (iIsUnspecified && !jIsUnspecified) || (iIsUnspecified && jIsUnspecified)
  37. }
  38. // As we open listen KCP connections, we register them here, so that Dial calls through
  39. // KCP could reuse them. This way we will hopefully work around restricted NATs by
  40. // dialing via the same connection we are listening on, creating a mapping on our NAT
  41. // to that IP, and hoping that the other end will try to dial our listen address and
  42. // using the mapping we've established when we dialed.
  43. func getDialingFilter() *pfilter.PacketFilter {
  44. mut.Lock()
  45. defer mut.Unlock()
  46. if len(filters) == 0 {
  47. return nil
  48. }
  49. return filters[0]
  50. }
  51. func registerFilter(filter *pfilter.PacketFilter) {
  52. mut.Lock()
  53. defer mut.Unlock()
  54. filters = append(filters, filter)
  55. sort.Sort(filterList(filters))
  56. }
  57. func deregisterFilter(filter *pfilter.PacketFilter) {
  58. mut.Lock()
  59. defer mut.Unlock()
  60. for i, f := range filters {
  61. if f == filter {
  62. copy(filters[i:], filters[i+1:])
  63. filters[len(filters)-1] = nil
  64. filters = filters[:len(filters)-1]
  65. break
  66. }
  67. }
  68. sort.Sort(filterList(filters))
  69. }
  70. // Filters
  71. type kcpConversationFilter struct {
  72. convID uint32
  73. }
  74. func (f *kcpConversationFilter) Outgoing(out []byte, addr net.Addr) {
  75. if !f.isKCPConv(out) {
  76. panic("not a kcp conversation")
  77. }
  78. atomic.StoreUint32(&f.convID, binary.LittleEndian.Uint32(out[:4]))
  79. }
  80. func (kcpConversationFilter) isKCPConv(data []byte) bool {
  81. // Need atleast 5 bytes
  82. if len(data) < 5 {
  83. return false
  84. }
  85. // First 4 bytes convID
  86. // 5th byte is cmd
  87. // IKCP_CMD_PUSH = 81 // cmd: push data
  88. // IKCP_CMD_ACK = 82 // cmd: ack
  89. // IKCP_CMD_WASK = 83 // cmd: window probe (ask)
  90. // IKCP_CMD_WINS = 84 // cmd: window size (tell)
  91. return 80 < data[4] && data[4] < 85
  92. }
  93. func (f *kcpConversationFilter) ClaimIncoming(in []byte, addr net.Addr) bool {
  94. if f.isKCPConv(in) {
  95. convID := atomic.LoadUint32(&f.convID)
  96. return convID != 0 && binary.LittleEndian.Uint32(in[:4]) == convID
  97. }
  98. return false
  99. }
  100. type stunFilter struct {
  101. ids map[string]time.Time
  102. mut sync.Mutex
  103. }
  104. func (f *stunFilter) Outgoing(out []byte, addr net.Addr) {
  105. if !f.isStunPayload(out) {
  106. panic("not a stun payload")
  107. }
  108. id := string(out[8:20])
  109. f.mut.Lock()
  110. f.ids[id] = time.Now().Add(time.Minute)
  111. f.reap()
  112. f.mut.Unlock()
  113. }
  114. func (f *stunFilter) ClaimIncoming(in []byte, addr net.Addr) bool {
  115. if f.isStunPayload(in) {
  116. id := string(in[8:20])
  117. f.mut.Lock()
  118. _, ok := f.ids[id]
  119. f.reap()
  120. f.mut.Unlock()
  121. return ok
  122. }
  123. return false
  124. }
  125. func (f *stunFilter) isStunPayload(data []byte) bool {
  126. // Need atleast 20 bytes
  127. if len(data) < 20 {
  128. return false
  129. }
  130. // First two bits always unset, and should always send magic cookie.
  131. return data[0]&0xc0 == 0 && bytes.Equal(data[4:8], []byte{0x21, 0x12, 0xA4, 0x42})
  132. }
  133. func (f *stunFilter) reap() {
  134. now := time.Now()
  135. for id, timeout := range f.ids {
  136. if timeout.Before(now) {
  137. delete(f.ids, id)
  138. }
  139. }
  140. }
  141. type sessionClosingStream struct {
  142. *yamux.Stream
  143. session *yamux.Session
  144. }
  145. func (w *sessionClosingStream) Close() error {
  146. err1 := w.Stream.Close()
  147. deadline := time.Now().Add(5 * time.Second)
  148. for w.session.NumStreams() > 0 && time.Now().Before(deadline) {
  149. time.Sleep(200 * time.Millisecond)
  150. }
  151. err2 := w.session.Close()
  152. if err1 != nil {
  153. return err1
  154. }
  155. return err2
  156. }