mux.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package splithttp
  2. import (
  3. "context"
  4. "math/rand"
  5. "sync/atomic"
  6. "time"
  7. "github.com/xtls/xray-core/common/errors"
  8. )
  9. type muxResource struct {
  10. Resource interface{}
  11. OpenRequests atomic.Int32
  12. leftUsage int32
  13. expirationTime time.Time
  14. }
  15. type muxManager struct {
  16. newResourceFn func() interface{}
  17. config Multiplexing
  18. concurrency int32
  19. connections int32
  20. instances []*muxResource
  21. }
  22. func NewMuxManager(config Multiplexing, newResource func() interface{}) *muxManager {
  23. return &muxManager{
  24. config: config,
  25. concurrency: config.GetNormalizedMaxConcurrency().roll(),
  26. connections: config.GetNormalizedMaxConnections().roll(),
  27. newResourceFn: newResource,
  28. instances: make([]*muxResource, 0),
  29. }
  30. }
  31. func (m *muxManager) GetResource(ctx context.Context) *muxResource {
  32. m.removeExpiredConnections(ctx)
  33. if m.connections > 0 && len(m.instances) < int(m.connections) {
  34. errors.LogDebug(ctx, "xmux: creating client, connections=", len(m.instances))
  35. return m.newResource()
  36. }
  37. if len(m.instances) == 0 {
  38. errors.LogDebug(ctx, "xmux: creating client because instances is empty, connections=", len(m.instances))
  39. return m.newResource()
  40. }
  41. clients := make([]*muxResource, 0)
  42. if m.concurrency > 0 {
  43. for _, client := range m.instances {
  44. openRequests := client.OpenRequests.Load()
  45. if openRequests < m.concurrency {
  46. clients = append(clients, client)
  47. }
  48. }
  49. } else {
  50. clients = m.instances
  51. }
  52. if len(clients) == 0 {
  53. errors.LogDebug(ctx, "xmux: creating client because concurrency was hit, total clients=", len(m.instances))
  54. return m.newResource()
  55. }
  56. client := clients[rand.Intn(len(clients))]
  57. if client.leftUsage > 0 {
  58. client.leftUsage -= 1
  59. }
  60. return client
  61. }
  62. func (m *muxManager) newResource() *muxResource {
  63. leftUsage := int32(-1)
  64. if x := m.config.GetNormalizedCMaxReuseTimes().roll(); x > 0 {
  65. leftUsage = x - 1
  66. }
  67. expirationTime := time.UnixMilli(0)
  68. if x := m.config.GetNormalizedCMaxLifetimeMs().roll(); x > 0 {
  69. expirationTime = time.Now().Add(time.Duration(x) * time.Millisecond)
  70. }
  71. client := &muxResource{
  72. Resource: m.newResourceFn(),
  73. leftUsage: leftUsage,
  74. expirationTime: expirationTime,
  75. }
  76. m.instances = append(m.instances, client)
  77. return client
  78. }
  79. func (m *muxManager) removeExpiredConnections(ctx context.Context) {
  80. for i := 0; i < len(m.instances); i++ {
  81. client := m.instances[i]
  82. if client.leftUsage == 0 || (client.expirationTime != time.UnixMilli(0) && time.Now().After(client.expirationTime)) {
  83. errors.LogDebug(ctx, "xmux: removing client, leftUsage = ", client.leftUsage, ", expirationTime = ", client.expirationTime)
  84. m.instances = append(m.instances[:i], m.instances[i+1:]...)
  85. i--
  86. }
  87. }
  88. }