session.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package mux
  2. import (
  3. "io"
  4. "runtime"
  5. "sync"
  6. "time"
  7. "github.com/xtls/xray-core/common"
  8. "github.com/xtls/xray-core/common/buf"
  9. "github.com/xtls/xray-core/common/net"
  10. "github.com/xtls/xray-core/common/protocol"
  11. "github.com/xtls/xray-core/transport/pipe"
  12. )
  13. type SessionManager struct {
  14. sync.RWMutex
  15. sessions map[uint16]*Session
  16. count uint16
  17. closed bool
  18. }
  19. func NewSessionManager() *SessionManager {
  20. return &SessionManager{
  21. count: 0,
  22. sessions: make(map[uint16]*Session, 16),
  23. }
  24. }
  25. func (m *SessionManager) Closed() bool {
  26. m.RLock()
  27. defer m.RUnlock()
  28. return m.closed
  29. }
  30. func (m *SessionManager) Size() int {
  31. m.RLock()
  32. defer m.RUnlock()
  33. return len(m.sessions)
  34. }
  35. func (m *SessionManager) Count() int {
  36. m.RLock()
  37. defer m.RUnlock()
  38. return int(m.count)
  39. }
  40. func (m *SessionManager) Allocate() *Session {
  41. m.Lock()
  42. defer m.Unlock()
  43. if m.closed {
  44. return nil
  45. }
  46. m.count++
  47. s := &Session{
  48. ID: m.count,
  49. parent: m,
  50. }
  51. m.sessions[s.ID] = s
  52. return s
  53. }
  54. func (m *SessionManager) Add(s *Session) bool {
  55. m.Lock()
  56. defer m.Unlock()
  57. if m.closed {
  58. return false
  59. }
  60. m.count++
  61. m.sessions[s.ID] = s
  62. return true
  63. }
  64. func (m *SessionManager) Remove(locked bool, id uint16) {
  65. if !locked {
  66. m.Lock()
  67. defer m.Unlock()
  68. }
  69. locked = true
  70. if m.closed {
  71. return
  72. }
  73. delete(m.sessions, id)
  74. /*
  75. if len(m.sessions) == 0 {
  76. m.sessions = make(map[uint16]*Session, 16)
  77. }
  78. */
  79. }
  80. func (m *SessionManager) Get(id uint16) (*Session, bool) {
  81. m.RLock()
  82. defer m.RUnlock()
  83. if m.closed {
  84. return nil, false
  85. }
  86. s, found := m.sessions[id]
  87. return s, found
  88. }
  89. func (m *SessionManager) CloseIfNoSession() bool {
  90. m.Lock()
  91. defer m.Unlock()
  92. if m.closed {
  93. return true
  94. }
  95. if len(m.sessions) != 0 {
  96. return false
  97. }
  98. m.closed = true
  99. return true
  100. }
  101. func (m *SessionManager) Close() error {
  102. m.Lock()
  103. defer m.Unlock()
  104. if m.closed {
  105. return nil
  106. }
  107. m.closed = true
  108. for _, s := range m.sessions {
  109. s.Close(true)
  110. }
  111. m.sessions = nil
  112. return nil
  113. }
  114. // Session represents a client connection in a Mux connection.
  115. type Session struct {
  116. input buf.Reader
  117. output buf.Writer
  118. parent *SessionManager
  119. ID uint16
  120. transferType protocol.TransferType
  121. closed bool
  122. XUDP *XUDP
  123. }
  124. // Close closes all resources associated with this session.
  125. func (s *Session) Close(locked bool) error {
  126. if !locked {
  127. s.parent.Lock()
  128. defer s.parent.Unlock()
  129. }
  130. locked = true
  131. if s.closed {
  132. return nil
  133. }
  134. s.closed = true
  135. if s.XUDP == nil {
  136. common.Interrupt(s.input)
  137. common.Close(s.output)
  138. } else {
  139. // Stop existing handle(), then trigger writer.Close().
  140. // Note that s.output may be dispatcher.SizeStatWriter.
  141. s.input.(*pipe.Reader).ReturnAnError(io.EOF)
  142. runtime.Gosched()
  143. // If the error set by ReturnAnError still exists, clear it.
  144. s.input.(*pipe.Reader).Recover()
  145. XUDPManager.Lock()
  146. if s.XUDP.Status == Active {
  147. s.XUDP.Expire = time.Now().Add(time.Minute)
  148. s.XUDP.Status = Expiring
  149. newError("XUDP put ", s.XUDP.GlobalID).AtDebug().WriteToLog()
  150. }
  151. XUDPManager.Unlock()
  152. }
  153. s.parent.Remove(locked, s.ID)
  154. return nil
  155. }
  156. // NewReader creates a buf.Reader based on the transfer type of this Session.
  157. func (s *Session) NewReader(reader *buf.BufferedReader, dest *net.Destination) buf.Reader {
  158. if s.transferType == protocol.TransferTypeStream {
  159. return NewStreamReader(reader)
  160. }
  161. return NewPacketReader(reader, dest)
  162. }
  163. const (
  164. Initializing = 0
  165. Active = 1
  166. Expiring = 2
  167. )
  168. type XUDP struct {
  169. GlobalID [8]byte
  170. Status uint64
  171. Expire time.Time
  172. Mux *Session
  173. }
  174. func (x *XUDP) Interrupt() {
  175. common.Interrupt(x.Mux.input)
  176. common.Close(x.Mux.output)
  177. }
  178. var XUDPManager struct {
  179. sync.Mutex
  180. Map map[[8]byte]*XUDP
  181. }
  182. func init() {
  183. XUDPManager.Map = make(map[[8]byte]*XUDP)
  184. go func() {
  185. for {
  186. time.Sleep(time.Minute)
  187. now := time.Now()
  188. XUDPManager.Lock()
  189. for id, x := range XUDPManager.Map {
  190. if x.Status == Expiring && now.After(x.Expire) {
  191. x.Interrupt()
  192. delete(XUDPManager.Map, id)
  193. newError("XUDP del ", id).AtDebug().WriteToLog()
  194. }
  195. }
  196. XUDPManager.Unlock()
  197. }
  198. }()
  199. }