handler_test.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package outbound_test
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "testing"
  8. "time"
  9. "github.com/xtls/xray-core/app/policy"
  10. "github.com/xtls/xray-core/app/proxyman"
  11. . "github.com/xtls/xray-core/app/proxyman/outbound"
  12. "github.com/xtls/xray-core/app/stats"
  13. "github.com/xtls/xray-core/common/net"
  14. "github.com/xtls/xray-core/common/serial"
  15. "github.com/xtls/xray-core/common/session"
  16. core "github.com/xtls/xray-core/core"
  17. "github.com/xtls/xray-core/features/outbound"
  18. "github.com/xtls/xray-core/proxy/freedom"
  19. "github.com/xtls/xray-core/transport/internet/stat"
  20. )
  21. func TestInterfaces(t *testing.T) {
  22. _ = (outbound.Handler)(new(Handler))
  23. _ = (outbound.Manager)(new(Manager))
  24. }
  25. const xrayKey core.XrayKey = 1
  26. func TestOutboundWithoutStatCounter(t *testing.T) {
  27. config := &core.Config{
  28. App: []*serial.TypedMessage{
  29. serial.ToTypedMessage(&stats.Config{}),
  30. serial.ToTypedMessage(&policy.Config{
  31. System: &policy.SystemPolicy{
  32. Stats: &policy.SystemPolicy_Stats{
  33. InboundUplink: true,
  34. },
  35. },
  36. }),
  37. },
  38. }
  39. v, _ := core.New(config)
  40. v.AddFeature((outbound.Manager)(new(Manager)))
  41. ctx := context.WithValue(context.Background(), xrayKey, v)
  42. ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{{}})
  43. h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
  44. Tag: "tag",
  45. ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
  46. })
  47. conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
  48. _, ok := conn.(*stat.CounterConnection)
  49. if ok {
  50. t.Errorf("Expected conn to not be CounterConnection")
  51. }
  52. }
  53. func TestOutboundWithStatCounter(t *testing.T) {
  54. config := &core.Config{
  55. App: []*serial.TypedMessage{
  56. serial.ToTypedMessage(&stats.Config{}),
  57. serial.ToTypedMessage(&policy.Config{
  58. System: &policy.SystemPolicy{
  59. Stats: &policy.SystemPolicy_Stats{
  60. OutboundUplink: true,
  61. OutboundDownlink: true,
  62. },
  63. },
  64. }),
  65. },
  66. }
  67. v, _ := core.New(config)
  68. v.AddFeature((outbound.Manager)(new(Manager)))
  69. ctx := context.WithValue(context.Background(), xrayKey, v)
  70. ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{{}})
  71. h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
  72. Tag: "tag",
  73. ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
  74. })
  75. conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
  76. _, ok := conn.(*stat.CounterConnection)
  77. if !ok {
  78. t.Errorf("Expected conn to be CounterConnection")
  79. }
  80. }
  81. func TestTagsCache(t *testing.T) {
  82. test_duration := 10 * time.Second
  83. threads_num := 50
  84. delay := 10 * time.Millisecond
  85. tags_prefix := "node"
  86. tags := sync.Map{}
  87. counter := atomic.Uint64{}
  88. ohm, err := New(context.Background(), &proxyman.OutboundConfig{})
  89. if err != nil {
  90. t.Error("failed to create outbound handler manager")
  91. }
  92. config := &core.Config{
  93. App: []*serial.TypedMessage{},
  94. }
  95. v, _ := core.New(config)
  96. v.AddFeature(ohm)
  97. ctx := context.WithValue(context.Background(), xrayKey, v)
  98. stop_add_rm := false
  99. wg_add_rm := sync.WaitGroup{}
  100. addHandlers := func() {
  101. defer wg_add_rm.Done()
  102. for !stop_add_rm {
  103. time.Sleep(delay)
  104. idx := counter.Add(1)
  105. tag := fmt.Sprintf("%s%d", tags_prefix, idx)
  106. cfg := &core.OutboundHandlerConfig{
  107. Tag: tag,
  108. ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
  109. }
  110. if h, err := NewHandler(ctx, cfg); err == nil {
  111. if err := ohm.AddHandler(ctx, h); err == nil {
  112. // t.Log("add handler:", tag)
  113. tags.Store(tag, nil)
  114. } else {
  115. t.Error("failed to add handler:", tag)
  116. }
  117. } else {
  118. t.Error("failed to create handler:", tag)
  119. }
  120. }
  121. }
  122. rmHandlers := func() {
  123. defer wg_add_rm.Done()
  124. for !stop_add_rm {
  125. time.Sleep(delay)
  126. tags.Range(func(key interface{}, value interface{}) bool {
  127. if _, ok := tags.LoadAndDelete(key); ok {
  128. // t.Log("remove handler:", key)
  129. ohm.RemoveHandler(ctx, key.(string))
  130. return false
  131. }
  132. return true
  133. })
  134. }
  135. }
  136. selectors := []string{tags_prefix}
  137. wg_get := sync.WaitGroup{}
  138. stop_get := false
  139. getTags := func() {
  140. defer wg_get.Done()
  141. for !stop_get {
  142. time.Sleep(delay)
  143. _ = ohm.Select(selectors)
  144. // t.Logf("get tags: %v", tag)
  145. }
  146. }
  147. for i := 0; i < threads_num; i++ {
  148. wg_add_rm.Add(2)
  149. go rmHandlers()
  150. go addHandlers()
  151. wg_get.Add(1)
  152. go getTags()
  153. }
  154. time.Sleep(test_duration)
  155. stop_add_rm = true
  156. wg_add_rm.Wait()
  157. stop_get = true
  158. wg_get.Wait()
  159. }