session.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package mux
  2. import (
  3. "context"
  4. "io"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "github.com/xtls/xray-core/common"
  9. "github.com/xtls/xray-core/common/buf"
  10. "github.com/xtls/xray-core/common/errors"
  11. "github.com/xtls/xray-core/common/net"
  12. "github.com/xtls/xray-core/common/protocol"
  13. "github.com/xtls/xray-core/transport/pipe"
  14. )
  15. type SessionManager struct {
  16. sync.RWMutex
  17. sessions map[uint16]*Session
  18. count uint16
  19. closed bool
  20. }
  21. func NewSessionManager() *SessionManager {
  22. return &SessionManager{
  23. count: 0,
  24. sessions: make(map[uint16]*Session, 16),
  25. }
  26. }
  27. func (m *SessionManager) Closed() bool {
  28. m.RLock()
  29. defer m.RUnlock()
  30. return m.closed
  31. }
  32. func (m *SessionManager) Size() int {
  33. m.RLock()
  34. defer m.RUnlock()
  35. return len(m.sessions)
  36. }
  37. func (m *SessionManager) Count() int {
  38. m.RLock()
  39. defer m.RUnlock()
  40. return int(m.count)
  41. }
  42. func (m *SessionManager) Allocate(Strategy *ClientStrategy) *Session {
  43. m.Lock()
  44. defer m.Unlock()
  45. MaxConcurrency := int(Strategy.MaxConcurrency)
  46. MaxConnection := uint16(Strategy.MaxConnection)
  47. if m.closed || (MaxConcurrency > 0 && len(m.sessions) >= MaxConcurrency) || (MaxConnection > 0 && m.count >= MaxConnection) {
  48. return nil
  49. }
  50. m.count++
  51. s := &Session{
  52. ID: m.count,
  53. parent: m,
  54. }
  55. m.sessions[s.ID] = s
  56. return s
  57. }
  58. func (m *SessionManager) Add(s *Session) bool {
  59. m.Lock()
  60. defer m.Unlock()
  61. if m.closed {
  62. return false
  63. }
  64. m.count++
  65. m.sessions[s.ID] = s
  66. return true
  67. }
  68. func (m *SessionManager) Remove(locked bool, id uint16) {
  69. if !locked {
  70. m.Lock()
  71. defer m.Unlock()
  72. }
  73. locked = true
  74. if m.closed {
  75. return
  76. }
  77. delete(m.sessions, id)
  78. /*
  79. if len(m.sessions) == 0 {
  80. m.sessions = make(map[uint16]*Session, 16)
  81. }
  82. */
  83. }
  84. func (m *SessionManager) Get(id uint16) (*Session, bool) {
  85. m.RLock()
  86. defer m.RUnlock()
  87. if m.closed {
  88. return nil, false
  89. }
  90. s, found := m.sessions[id]
  91. return s, found
  92. }
  93. func (m *SessionManager) CloseIfNoSession() bool {
  94. m.Lock()
  95. defer m.Unlock()
  96. if m.closed {
  97. return true
  98. }
  99. if len(m.sessions) != 0 {
  100. return false
  101. }
  102. m.closed = true
  103. return true
  104. }
  105. func (m *SessionManager) Close() error {
  106. m.Lock()
  107. defer m.Unlock()
  108. if m.closed {
  109. return nil
  110. }
  111. m.closed = true
  112. for _, s := range m.sessions {
  113. s.Close(true)
  114. }
  115. m.sessions = nil
  116. return nil
  117. }
  118. // Session represents a client connection in a Mux connection.
  119. type Session struct {
  120. input buf.Reader
  121. output buf.Writer
  122. parent *SessionManager
  123. ID uint16
  124. transferType protocol.TransferType
  125. closed bool
  126. XUDP *XUDP
  127. }
  128. // Close closes all resources associated with this session.
  129. func (s *Session) Close(locked bool) error {
  130. if !locked {
  131. s.parent.Lock()
  132. defer s.parent.Unlock()
  133. }
  134. locked = true
  135. if s.closed {
  136. return nil
  137. }
  138. s.closed = true
  139. if s.XUDP == nil {
  140. common.Interrupt(s.input)
  141. common.Close(s.output)
  142. } else {
  143. // Stop existing handle(), then trigger writer.Close().
  144. // Note that s.output may be dispatcher.SizeStatWriter.
  145. s.input.(*pipe.Reader).ReturnAnError(io.EOF)
  146. runtime.Gosched()
  147. // If the error set by ReturnAnError still exists, clear it.
  148. s.input.(*pipe.Reader).Recover()
  149. XUDPManager.Lock()
  150. if s.XUDP.Status == Active {
  151. s.XUDP.Expire = time.Now().Add(time.Minute)
  152. s.XUDP.Status = Expiring
  153. errors.LogDebug(context.Background(), "XUDP put ", s.XUDP.GlobalID)
  154. }
  155. XUDPManager.Unlock()
  156. }
  157. s.parent.Remove(locked, s.ID)
  158. return nil
  159. }
  160. // NewReader creates a buf.Reader based on the transfer type of this Session.
  161. func (s *Session) NewReader(reader *buf.BufferedReader, dest *net.Destination) buf.Reader {
  162. if s.transferType == protocol.TransferTypeStream {
  163. return NewStreamReader(reader)
  164. }
  165. return NewPacketReader(reader, dest)
  166. }
  167. const (
  168. Initializing = 0
  169. Active = 1
  170. Expiring = 2
  171. )
  172. type XUDP struct {
  173. GlobalID [8]byte
  174. Status uint64
  175. Expire time.Time
  176. Mux *Session
  177. }
  178. func (x *XUDP) Interrupt() {
  179. common.Interrupt(x.Mux.input)
  180. common.Close(x.Mux.output)
  181. }
  182. var XUDPManager struct {
  183. sync.Mutex
  184. Map map[[8]byte]*XUDP
  185. }
  186. func init() {
  187. XUDPManager.Map = make(map[[8]byte]*XUDP)
  188. go func() {
  189. for {
  190. time.Sleep(time.Minute)
  191. now := time.Now()
  192. XUDPManager.Lock()
  193. for id, x := range XUDPManager.Map {
  194. if x.Status == Expiring && now.After(x.Expire) {
  195. x.Interrupt()
  196. delete(XUDPManager.Map, id)
  197. errors.LogDebug(context.Background(), "XUDP del ", id)
  198. }
  199. }
  200. XUDPManager.Unlock()
  201. }
  202. }()
  203. }