dialer.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package quic
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/quic-go/quic-go"
  7. "github.com/quic-go/quic-go/logging"
  8. "github.com/quic-go/quic-go/qlog"
  9. "github.com/xtls/xray-core/common"
  10. "github.com/xtls/xray-core/common/net"
  11. "github.com/xtls/xray-core/common/task"
  12. "github.com/xtls/xray-core/transport/internet"
  13. "github.com/xtls/xray-core/transport/internet/stat"
  14. "github.com/xtls/xray-core/transport/internet/tls"
  15. )
  16. type connectionContext struct {
  17. rawConn *sysConn
  18. conn quic.Connection
  19. }
  20. var errConnectionClosed = newError("connection closed")
  21. func (c *connectionContext) openStream(destAddr net.Addr) (*interConn, error) {
  22. if !isActive(c.conn) {
  23. return nil, errConnectionClosed
  24. }
  25. stream, err := c.conn.OpenStream()
  26. if err != nil {
  27. return nil, err
  28. }
  29. conn := &interConn{
  30. stream: stream,
  31. local: c.conn.LocalAddr(),
  32. remote: destAddr,
  33. }
  34. return conn, nil
  35. }
  36. type clientConnections struct {
  37. access sync.Mutex
  38. conns map[net.Destination][]*connectionContext
  39. cleanup *task.Periodic
  40. }
  41. func isActive(s quic.Connection) bool {
  42. select {
  43. case <-s.Context().Done():
  44. return false
  45. default:
  46. return true
  47. }
  48. }
  49. func removeInactiveConnections(conns []*connectionContext) []*connectionContext {
  50. activeConnections := make([]*connectionContext, 0, len(conns))
  51. for i, s := range conns {
  52. if isActive(s.conn) {
  53. activeConnections = append(activeConnections, s)
  54. continue
  55. }
  56. newError("closing quic connection at index: ", i).WriteToLog()
  57. if err := s.conn.CloseWithError(0, ""); err != nil {
  58. newError("failed to close connection").Base(err).WriteToLog()
  59. }
  60. if err := s.rawConn.Close(); err != nil {
  61. newError("failed to close raw connection").Base(err).WriteToLog()
  62. }
  63. }
  64. if len(activeConnections) < len(conns) {
  65. newError("active quic connection reduced from ", len(conns), " to ", len(activeConnections)).WriteToLog()
  66. return activeConnections
  67. }
  68. return conns
  69. }
  70. func (s *clientConnections) cleanConnections() error {
  71. s.access.Lock()
  72. defer s.access.Unlock()
  73. if len(s.conns) == 0 {
  74. return nil
  75. }
  76. newConnMap := make(map[net.Destination][]*connectionContext)
  77. for dest, conns := range s.conns {
  78. conns = removeInactiveConnections(conns)
  79. if len(conns) > 0 {
  80. newConnMap[dest] = conns
  81. }
  82. }
  83. s.conns = newConnMap
  84. return nil
  85. }
  86. func (s *clientConnections) openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) {
  87. s.access.Lock()
  88. defer s.access.Unlock()
  89. if s.conns == nil {
  90. s.conns = make(map[net.Destination][]*connectionContext)
  91. }
  92. dest := net.DestinationFromAddr(destAddr)
  93. var conns []*connectionContext
  94. if s, found := s.conns[dest]; found {
  95. conns = s
  96. }
  97. if len(conns) > 0 {
  98. s := conns[len(conns)-1]
  99. if isActive(s.conn) {
  100. conn, err := s.openStream(destAddr)
  101. if err == nil {
  102. return conn, nil
  103. }
  104. newError("failed to openStream: ").Base(err).WriteToLog()
  105. } else {
  106. newError("current quic connection is not active!").WriteToLog()
  107. }
  108. }
  109. conns = removeInactiveConnections(conns)
  110. newError("dialing quic to ", dest).WriteToLog()
  111. rawConn, err := internet.DialSystem(ctx, dest, sockopt)
  112. if err != nil {
  113. return nil, newError("failed to dial to dest: ", err).AtWarning().Base(err)
  114. }
  115. quicConfig := &quic.Config{
  116. KeepAlivePeriod: 0,
  117. HandshakeIdleTimeout: time.Second * 8,
  118. MaxIdleTimeout: time.Second * 300,
  119. Tracer: func(ctx context.Context, p logging.Perspective, ci quic.ConnectionID) *logging.ConnectionTracer {
  120. return qlog.NewConnectionTracer(&QlogWriter{connID: ci}, p, ci)
  121. },
  122. }
  123. var udpConn *net.UDPConn
  124. switch conn := rawConn.(type) {
  125. case *net.UDPConn:
  126. udpConn = conn
  127. case *internet.PacketConnWrapper:
  128. udpConn = conn.Conn.(*net.UDPConn)
  129. default:
  130. // TODO: Support sockopt for QUIC
  131. rawConn.Close()
  132. return nil, newError("QUIC with sockopt is unsupported").AtWarning()
  133. }
  134. sysConn, err := wrapSysConn(udpConn, config)
  135. if err != nil {
  136. rawConn.Close()
  137. return nil, err
  138. }
  139. tr := quic.Transport{
  140. ConnectionIDLength: 12,
  141. Conn: sysConn,
  142. }
  143. conn, err := tr.Dial(context.Background(), destAddr, tlsConfig.GetTLSConfig(tls.WithDestination(dest)), quicConfig)
  144. if err != nil {
  145. sysConn.Close()
  146. return nil, err
  147. }
  148. context := &connectionContext{
  149. conn: conn,
  150. rawConn: sysConn,
  151. }
  152. s.conns[dest] = append(conns, context)
  153. return context.openStream(destAddr)
  154. }
  155. var client clientConnections
  156. func init() {
  157. client.conns = make(map[net.Destination][]*connectionContext)
  158. client.cleanup = &task.Periodic{
  159. Interval: time.Minute,
  160. Execute: client.cleanConnections,
  161. }
  162. common.Must(client.cleanup.Start())
  163. }
  164. func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
  165. tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
  166. if tlsConfig == nil {
  167. tlsConfig = &tls.Config{
  168. ServerName: internalDomain,
  169. AllowInsecure: true,
  170. }
  171. }
  172. var destAddr *net.UDPAddr
  173. if dest.Address.Family().IsIP() {
  174. destAddr = &net.UDPAddr{
  175. IP: dest.Address.IP(),
  176. Port: int(dest.Port),
  177. }
  178. } else {
  179. dialerIp := internet.DestIpAddress()
  180. if dialerIp != nil {
  181. destAddr = &net.UDPAddr{
  182. IP: dialerIp,
  183. Port: int(dest.Port),
  184. }
  185. newError("quic Dial use dialer dest addr: ", destAddr).WriteToLog()
  186. } else {
  187. addr, err := net.ResolveUDPAddr("udp", dest.NetAddr())
  188. if err != nil {
  189. return nil, err
  190. }
  191. destAddr = addr
  192. }
  193. }
  194. config := streamSettings.ProtocolSettings.(*Config)
  195. return client.openConnection(ctx, destAddr, config, tlsConfig, streamSettings.SocketSettings)
  196. }
  197. func init() {
  198. common.Must(internet.RegisterTransportDialer(protocolName, Dial))
  199. }