handler_test.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package outbound_test
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "testing"
  8. "time"
  9. "github.com/xtls/xray-core/app/policy"
  10. "github.com/xtls/xray-core/app/proxyman"
  11. . "github.com/xtls/xray-core/app/proxyman/outbound"
  12. "github.com/xtls/xray-core/app/stats"
  13. "github.com/xtls/xray-core/common/net"
  14. "github.com/xtls/xray-core/common/serial"
  15. core "github.com/xtls/xray-core/core"
  16. "github.com/xtls/xray-core/features/outbound"
  17. "github.com/xtls/xray-core/proxy/freedom"
  18. "github.com/xtls/xray-core/transport/internet/stat"
  19. )
  20. func TestInterfaces(t *testing.T) {
  21. _ = (outbound.Handler)(new(Handler))
  22. _ = (outbound.Manager)(new(Manager))
  23. }
  24. const xrayKey core.XrayKey = 1
  25. func TestOutboundWithoutStatCounter(t *testing.T) {
  26. config := &core.Config{
  27. App: []*serial.TypedMessage{
  28. serial.ToTypedMessage(&stats.Config{}),
  29. serial.ToTypedMessage(&policy.Config{
  30. System: &policy.SystemPolicy{
  31. Stats: &policy.SystemPolicy_Stats{
  32. InboundUplink: true,
  33. },
  34. },
  35. }),
  36. },
  37. }
  38. v, _ := core.New(config)
  39. v.AddFeature((outbound.Manager)(new(Manager)))
  40. ctx := context.WithValue(context.Background(), xrayKey, v)
  41. h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
  42. Tag: "tag",
  43. ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
  44. })
  45. conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
  46. _, ok := conn.(*stat.CounterConnection)
  47. if ok {
  48. t.Errorf("Expected conn to not be CounterConnection")
  49. }
  50. }
  51. func TestOutboundWithStatCounter(t *testing.T) {
  52. config := &core.Config{
  53. App: []*serial.TypedMessage{
  54. serial.ToTypedMessage(&stats.Config{}),
  55. serial.ToTypedMessage(&policy.Config{
  56. System: &policy.SystemPolicy{
  57. Stats: &policy.SystemPolicy_Stats{
  58. OutboundUplink: true,
  59. OutboundDownlink: true,
  60. },
  61. },
  62. }),
  63. },
  64. }
  65. v, _ := core.New(config)
  66. v.AddFeature((outbound.Manager)(new(Manager)))
  67. ctx := context.WithValue(context.Background(), xrayKey, v)
  68. h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
  69. Tag: "tag",
  70. ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
  71. })
  72. conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
  73. _, ok := conn.(*stat.CounterConnection)
  74. if !ok {
  75. t.Errorf("Expected conn to be CounterConnection")
  76. }
  77. }
  78. func TestTagsCache(t *testing.T) {
  79. test_duration := 10 * time.Second
  80. threads_num := 50
  81. delay := 10 * time.Millisecond
  82. tags_prefix := "node"
  83. tags := sync.Map{}
  84. counter := atomic.Uint64{}
  85. ohm, err := New(context.Background(), &proxyman.OutboundConfig{})
  86. if err != nil {
  87. t.Error("failed to create outbound handler manager")
  88. }
  89. config := &core.Config{
  90. App: []*serial.TypedMessage{},
  91. }
  92. v, _ := core.New(config)
  93. v.AddFeature(ohm)
  94. ctx := context.WithValue(context.Background(), xrayKey, v)
  95. stop_add_rm := false
  96. wg_add_rm := sync.WaitGroup{}
  97. addHandlers := func() {
  98. defer wg_add_rm.Done()
  99. for !stop_add_rm {
  100. time.Sleep(delay)
  101. idx := counter.Add(1)
  102. tag := fmt.Sprintf("%s%d", tags_prefix, idx)
  103. cfg := &core.OutboundHandlerConfig{
  104. Tag: tag,
  105. ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
  106. }
  107. if h, err := NewHandler(ctx, cfg); err == nil {
  108. if err := ohm.AddHandler(ctx, h); err == nil {
  109. // t.Log("add handler:", tag)
  110. tags.Store(tag, nil)
  111. } else {
  112. t.Error("failed to add handler:", tag)
  113. }
  114. } else {
  115. t.Error("failed to create handler:", tag)
  116. }
  117. }
  118. }
  119. rmHandlers := func() {
  120. defer wg_add_rm.Done()
  121. for !stop_add_rm {
  122. time.Sleep(delay)
  123. tags.Range(func(key interface{}, value interface{}) bool {
  124. if _, ok := tags.LoadAndDelete(key); ok {
  125. // t.Log("remove handler:", key)
  126. ohm.RemoveHandler(ctx, key.(string))
  127. return false
  128. }
  129. return true
  130. })
  131. }
  132. }
  133. selectors := []string{tags_prefix}
  134. wg_get := sync.WaitGroup{}
  135. stop_get := false
  136. getTags := func() {
  137. defer wg_get.Done()
  138. for !stop_get {
  139. time.Sleep(delay)
  140. _ = ohm.Select(selectors)
  141. // t.Logf("get tags: %v", tag)
  142. }
  143. }
  144. for i := 0; i < threads_num; i++ {
  145. wg_add_rm.Add(2)
  146. go rmHandlers()
  147. go addHandlers()
  148. wg_get.Add(1)
  149. go getTags()
  150. }
  151. time.Sleep(test_duration)
  152. stop_add_rm = true
  153. wg_add_rm.Wait()
  154. stop_get = true
  155. wg_get.Wait()
  156. }