balancing.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package router
  2. import (
  3. "context"
  4. sync "sync"
  5. "github.com/xtls/xray-core/app/observatory"
  6. "github.com/xtls/xray-core/common"
  7. "github.com/xtls/xray-core/common/errors"
  8. "github.com/xtls/xray-core/core"
  9. "github.com/xtls/xray-core/features/extension"
  10. "github.com/xtls/xray-core/features/outbound"
  11. )
  12. type BalancingStrategy interface {
  13. PickOutbound([]string) string
  14. }
  15. type BalancingPrincipleTarget interface {
  16. GetPrincipleTarget([]string) []string
  17. }
  18. type RoundRobinStrategy struct {
  19. FallbackTag string
  20. ctx context.Context
  21. observatory extension.Observatory
  22. mu sync.Mutex
  23. index int
  24. }
  25. func (s *RoundRobinStrategy) InjectContext(ctx context.Context) {
  26. s.ctx = ctx
  27. }
  28. func (s *RoundRobinStrategy) GetPrincipleTarget(strings []string) []string {
  29. return strings
  30. }
  31. func (s *RoundRobinStrategy) PickOutbound(tags []string) string {
  32. if len(s.FallbackTag) > 0 && s.observatory == nil {
  33. common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
  34. s.observatory = observatory
  35. return nil
  36. }))
  37. }
  38. if s.observatory != nil {
  39. observeReport, err := s.observatory.GetObservation(s.ctx)
  40. if err == nil {
  41. aliveTags := make([]string, 0)
  42. if result, ok := observeReport.(*observatory.ObservationResult); ok {
  43. status := result.Status
  44. statusMap := make(map[string]*observatory.OutboundStatus)
  45. for _, outboundStatus := range status {
  46. statusMap[outboundStatus.OutboundTag] = outboundStatus
  47. }
  48. for _, candidate := range tags {
  49. if outboundStatus, found := statusMap[candidate]; found {
  50. if outboundStatus.Alive {
  51. aliveTags = append(aliveTags, candidate)
  52. }
  53. } else {
  54. // unfound candidate is considered alive
  55. aliveTags = append(aliveTags, candidate)
  56. }
  57. }
  58. tags = aliveTags
  59. }
  60. }
  61. }
  62. n := len(tags)
  63. if n == 0 {
  64. // goes to fallbackTag
  65. return ""
  66. }
  67. s.mu.Lock()
  68. defer s.mu.Unlock()
  69. tag := tags[s.index%n]
  70. s.index = (s.index + 1) % n
  71. return tag
  72. }
  73. type Balancer struct {
  74. selectors []string
  75. strategy BalancingStrategy
  76. ohm outbound.Manager
  77. fallbackTag string
  78. override override
  79. }
  80. // PickOutbound picks the tag of a outbound
  81. func (b *Balancer) PickOutbound() (string, error) {
  82. candidates, err := b.SelectOutbounds()
  83. if err != nil {
  84. if b.fallbackTag != "" {
  85. errors.LogInfo(context.Background(), "fallback to [", b.fallbackTag, "], due to error: ", err)
  86. return b.fallbackTag, nil
  87. }
  88. return "", err
  89. }
  90. var tag string
  91. if o := b.override.Get(); o != "" {
  92. tag = o
  93. } else {
  94. tag = b.strategy.PickOutbound(candidates)
  95. }
  96. if tag == "" {
  97. if b.fallbackTag != "" {
  98. errors.LogInfo(context.Background(), "fallback to [", b.fallbackTag, "], due to empty tag returned")
  99. return b.fallbackTag, nil
  100. }
  101. // will use default handler
  102. return "", errors.New("balancing strategy returns empty tag")
  103. }
  104. return tag, nil
  105. }
  106. func (b *Balancer) InjectContext(ctx context.Context) {
  107. if contextReceiver, ok := b.strategy.(extension.ContextReceiver); ok {
  108. contextReceiver.InjectContext(ctx)
  109. }
  110. }
  111. // SelectOutbounds select outbounds with selectors of the Balancer
  112. func (b *Balancer) SelectOutbounds() ([]string, error) {
  113. hs, ok := b.ohm.(outbound.HandlerSelector)
  114. if !ok {
  115. return nil, errors.New("outbound.Manager is not a HandlerSelector")
  116. }
  117. tags := hs.Select(b.selectors)
  118. return tags, nil
  119. }
  120. // GetPrincipleTarget implements routing.BalancerPrincipleTarget
  121. func (r *Router) GetPrincipleTarget(tag string) ([]string, error) {
  122. if b, ok := r.balancers[tag]; ok {
  123. if s, ok := b.strategy.(BalancingPrincipleTarget); ok {
  124. candidates, err := b.SelectOutbounds()
  125. if err != nil {
  126. return nil, errors.New("unable to select outbounds").Base(err)
  127. }
  128. return s.GetPrincipleTarget(candidates), nil
  129. }
  130. return nil, errors.New("unsupported GetPrincipleTarget")
  131. }
  132. return nil, errors.New("cannot find tag")
  133. }
  134. // SetOverrideTarget implements routing.BalancerOverrider
  135. func (r *Router) SetOverrideTarget(tag, target string) error {
  136. if b, ok := r.balancers[tag]; ok {
  137. b.override.Put(target)
  138. return nil
  139. }
  140. return errors.New("cannot find tag")
  141. }
  142. // GetOverrideTarget implements routing.BalancerOverrider
  143. func (r *Router) GetOverrideTarget(tag string) (string, error) {
  144. if b, ok := r.balancers[tag]; ok {
  145. return b.override.Get(), nil
  146. }
  147. return "", errors.New("cannot find tag")
  148. }