kcp_misc.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package connections
  7. import (
  8. "bytes"
  9. "encoding/binary"
  10. "net"
  11. "sort"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/AudriusButkevicius/kcp-go"
  16. "github.com/AudriusButkevicius/pfilter"
  17. "github.com/xtaci/smux"
  18. )
  19. var (
  20. mut sync.Mutex
  21. filters filterList
  22. )
  23. func init() {
  24. kcp.BlacklistDuration = 10 * time.Minute
  25. }
  26. type filterList []*pfilter.PacketFilter
  27. // Sort connections by whether they are unspecified or not, as connections
  28. // listening on all addresses are more useful.
  29. func (f filterList) Len() int { return len(f) }
  30. func (f filterList) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
  31. func (f filterList) Less(i, j int) bool {
  32. iIsUnspecified := false
  33. jIsUnspecified := false
  34. if host, _, err := net.SplitHostPort(f[i].LocalAddr().String()); err == nil {
  35. iIsUnspecified = net.ParseIP(host).IsUnspecified()
  36. }
  37. if host, _, err := net.SplitHostPort(f[j].LocalAddr().String()); err == nil {
  38. jIsUnspecified = net.ParseIP(host).IsUnspecified()
  39. }
  40. return (iIsUnspecified && !jIsUnspecified) || (iIsUnspecified && jIsUnspecified)
  41. }
  42. // As we open listen KCP connections, we register them here, so that Dial calls through
  43. // KCP could reuse them. This way we will hopefully work around restricted NATs by
  44. // dialing via the same connection we are listening on, creating a mapping on our NAT
  45. // to that IP, and hoping that the other end will try to dial our listen address and
  46. // using the mapping we've established when we dialed.
  47. func getDialingFilter() *pfilter.PacketFilter {
  48. mut.Lock()
  49. defer mut.Unlock()
  50. if len(filters) == 0 {
  51. return nil
  52. }
  53. return filters[0]
  54. }
  55. func registerFilter(filter *pfilter.PacketFilter) {
  56. mut.Lock()
  57. defer mut.Unlock()
  58. filters = append(filters, filter)
  59. sort.Sort(filterList(filters))
  60. }
  61. func deregisterFilter(filter *pfilter.PacketFilter) {
  62. mut.Lock()
  63. defer mut.Unlock()
  64. for i, f := range filters {
  65. if f == filter {
  66. copy(filters[i:], filters[i+1:])
  67. filters[len(filters)-1] = nil
  68. filters = filters[:len(filters)-1]
  69. break
  70. }
  71. }
  72. sort.Sort(filterList(filters))
  73. }
  74. // Filters
  75. type kcpConversationFilter struct {
  76. convID uint32
  77. }
  78. func (f *kcpConversationFilter) Outgoing(out []byte, addr net.Addr) {
  79. if !f.isKCPConv(out) {
  80. panic("not a kcp conversation")
  81. }
  82. atomic.StoreUint32(&f.convID, binary.LittleEndian.Uint32(out[:4]))
  83. }
  84. func (kcpConversationFilter) isKCPConv(data []byte) bool {
  85. // Need at least 5 bytes
  86. if len(data) < 5 {
  87. return false
  88. }
  89. // First 4 bytes convID
  90. // 5th byte is cmd
  91. // IKCP_CMD_PUSH = 81 // cmd: push data
  92. // IKCP_CMD_ACK = 82 // cmd: ack
  93. // IKCP_CMD_WASK = 83 // cmd: window probe (ask)
  94. // IKCP_CMD_WINS = 84 // cmd: window size (tell)
  95. return 80 < data[4] && data[4] < 85
  96. }
  97. func (f *kcpConversationFilter) ClaimIncoming(in []byte, addr net.Addr) bool {
  98. if f.isKCPConv(in) {
  99. convID := atomic.LoadUint32(&f.convID)
  100. return convID != 0 && binary.LittleEndian.Uint32(in[:4]) == convID
  101. }
  102. return false
  103. }
  104. type stunFilter struct {
  105. ids map[string]time.Time
  106. mut sync.Mutex
  107. }
  108. func (f *stunFilter) Outgoing(out []byte, addr net.Addr) {
  109. if !f.isStunPayload(out) {
  110. panic("not a stun payload")
  111. }
  112. id := string(out[8:20])
  113. f.mut.Lock()
  114. f.ids[id] = time.Now().Add(time.Minute)
  115. f.reap()
  116. f.mut.Unlock()
  117. }
  118. func (f *stunFilter) ClaimIncoming(in []byte, addr net.Addr) bool {
  119. if f.isStunPayload(in) {
  120. id := string(in[8:20])
  121. f.mut.Lock()
  122. _, ok := f.ids[id]
  123. f.reap()
  124. f.mut.Unlock()
  125. return ok
  126. }
  127. return false
  128. }
  129. func (f *stunFilter) isStunPayload(data []byte) bool {
  130. // Need at least 20 bytes
  131. if len(data) < 20 {
  132. return false
  133. }
  134. // First two bits always unset, and should always send magic cookie.
  135. return data[0]&0xc0 == 0 && bytes.Equal(data[4:8], []byte{0x21, 0x12, 0xA4, 0x42})
  136. }
  137. func (f *stunFilter) reap() {
  138. now := time.Now()
  139. for id, timeout := range f.ids {
  140. if timeout.Before(now) {
  141. delete(f.ids, id)
  142. }
  143. }
  144. }
  145. type sessionClosingStream struct {
  146. *smux.Stream
  147. session *smux.Session
  148. }
  149. func (w *sessionClosingStream) Close() error {
  150. err1 := w.Stream.Close()
  151. deadline := time.Now().Add(5 * time.Second)
  152. for w.session.NumStreams() > 0 && time.Now().Before(deadline) {
  153. time.Sleep(200 * time.Millisecond)
  154. }
  155. err2 := w.session.Close()
  156. if err1 != nil {
  157. return err1
  158. }
  159. return err2
  160. }
  161. func boolInt(b bool) int {
  162. if b {
  163. return 1
  164. }
  165. return 0
  166. }