session.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package mux
  2. import (
  3. "sync"
  4. "github.com/xtls/xray-core/common"
  5. "github.com/xtls/xray-core/common/buf"
  6. "github.com/xtls/xray-core/common/protocol"
  7. )
  8. type SessionManager struct {
  9. sync.RWMutex
  10. sessions map[uint16]*Session
  11. count uint16
  12. closed bool
  13. }
  14. func NewSessionManager() *SessionManager {
  15. return &SessionManager{
  16. count: 0,
  17. sessions: make(map[uint16]*Session, 16),
  18. }
  19. }
  20. func (m *SessionManager) Closed() bool {
  21. m.RLock()
  22. defer m.RUnlock()
  23. return m.closed
  24. }
  25. func (m *SessionManager) Size() int {
  26. m.RLock()
  27. defer m.RUnlock()
  28. return len(m.sessions)
  29. }
  30. func (m *SessionManager) Count() int {
  31. m.RLock()
  32. defer m.RUnlock()
  33. return int(m.count)
  34. }
  35. func (m *SessionManager) Allocate() *Session {
  36. m.Lock()
  37. defer m.Unlock()
  38. if m.closed {
  39. return nil
  40. }
  41. m.count++
  42. s := &Session{
  43. ID: m.count,
  44. parent: m,
  45. }
  46. m.sessions[s.ID] = s
  47. return s
  48. }
  49. func (m *SessionManager) Add(s *Session) {
  50. m.Lock()
  51. defer m.Unlock()
  52. if m.closed {
  53. return
  54. }
  55. m.count++
  56. m.sessions[s.ID] = s
  57. }
  58. func (m *SessionManager) Remove(id uint16) {
  59. m.Lock()
  60. defer m.Unlock()
  61. if m.closed {
  62. return
  63. }
  64. delete(m.sessions, id)
  65. if len(m.sessions) == 0 {
  66. m.sessions = make(map[uint16]*Session, 16)
  67. }
  68. }
  69. func (m *SessionManager) Get(id uint16) (*Session, bool) {
  70. m.RLock()
  71. defer m.RUnlock()
  72. if m.closed {
  73. return nil, false
  74. }
  75. s, found := m.sessions[id]
  76. return s, found
  77. }
  78. func (m *SessionManager) CloseIfNoSession() bool {
  79. m.Lock()
  80. defer m.Unlock()
  81. if m.closed {
  82. return true
  83. }
  84. if len(m.sessions) != 0 {
  85. return false
  86. }
  87. m.closed = true
  88. return true
  89. }
  90. func (m *SessionManager) Close() error {
  91. m.Lock()
  92. defer m.Unlock()
  93. if m.closed {
  94. return nil
  95. }
  96. m.closed = true
  97. for _, s := range m.sessions {
  98. common.Close(s.input)
  99. common.Close(s.output)
  100. }
  101. m.sessions = nil
  102. return nil
  103. }
  104. // Session represents a client connection in a Mux connection.
  105. type Session struct {
  106. input buf.Reader
  107. output buf.Writer
  108. parent *SessionManager
  109. ID uint16
  110. transferType protocol.TransferType
  111. }
  112. // Close closes all resources associated with this session.
  113. func (s *Session) Close() error {
  114. common.Close(s.output)
  115. common.Close(s.input)
  116. s.parent.Remove(s.ID)
  117. return nil
  118. }
  119. // NewReader creates a buf.Reader based on the transfer type of this Session.
  120. func (s *Session) NewReader(reader *buf.BufferedReader) buf.Reader {
  121. if s.transferType == protocol.TransferTypeStream {
  122. return NewStreamReader(reader)
  123. }
  124. return NewPacketReader(reader)
  125. }