balancing.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package router
  2. import (
  3. "context"
  4. sync "sync"
  5. "github.com/xtls/xray-core/app/observatory"
  6. "github.com/xtls/xray-core/common"
  7. "github.com/xtls/xray-core/core"
  8. "github.com/xtls/xray-core/features/extension"
  9. "github.com/xtls/xray-core/features/outbound"
  10. )
  11. type BalancingStrategy interface {
  12. PickOutbound([]string) string
  13. }
  14. type BalancingPrincipleTarget interface {
  15. GetPrincipleTarget([]string) []string
  16. }
  17. type RoundRobinStrategy struct {
  18. FallbackTag string
  19. ctx context.Context
  20. observatory extension.Observatory
  21. mu sync.Mutex
  22. index int
  23. }
  24. func (s *RoundRobinStrategy) InjectContext(ctx context.Context) {
  25. s.ctx = ctx
  26. }
  27. func (s *RoundRobinStrategy) GetPrincipleTarget(strings []string) []string {
  28. return strings
  29. }
  30. func (s *RoundRobinStrategy) PickOutbound(tags []string) string {
  31. if len(s.FallbackTag) > 0 && s.observatory == nil {
  32. common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
  33. s.observatory = observatory
  34. return nil
  35. }))
  36. }
  37. if s.observatory != nil {
  38. observeReport, err := s.observatory.GetObservation(s.ctx)
  39. if err == nil {
  40. aliveTags := make([]string, 0)
  41. if result, ok := observeReport.(*observatory.ObservationResult); ok {
  42. status := result.Status
  43. statusMap := make(map[string]*observatory.OutboundStatus)
  44. for _, outboundStatus := range status {
  45. statusMap[outboundStatus.OutboundTag] = outboundStatus
  46. }
  47. for _, candidate := range tags {
  48. if outboundStatus, found := statusMap[candidate]; found {
  49. if outboundStatus.Alive {
  50. aliveTags = append(aliveTags, candidate)
  51. }
  52. } else {
  53. // unfound candidate is considered alive
  54. aliveTags = append(aliveTags, candidate)
  55. }
  56. }
  57. tags = aliveTags
  58. }
  59. }
  60. }
  61. n := len(tags)
  62. if n == 0 {
  63. // goes to fallbackTag
  64. return ""
  65. }
  66. s.mu.Lock()
  67. defer s.mu.Unlock()
  68. tag := tags[s.index%n]
  69. s.index = (s.index + 1) % n
  70. return tag
  71. }
  72. type Balancer struct {
  73. selectors []string
  74. strategy BalancingStrategy
  75. ohm outbound.Manager
  76. fallbackTag string
  77. override override
  78. }
  79. // PickOutbound picks the tag of a outbound
  80. func (b *Balancer) PickOutbound() (string, error) {
  81. candidates, err := b.SelectOutbounds()
  82. if err != nil {
  83. if b.fallbackTag != "" {
  84. newError("fallback to [", b.fallbackTag, "], due to error: ", err).AtInfo().WriteToLog()
  85. return b.fallbackTag, nil
  86. }
  87. return "", err
  88. }
  89. var tag string
  90. if o := b.override.Get(); o != "" {
  91. tag = o
  92. } else {
  93. tag = b.strategy.PickOutbound(candidates)
  94. }
  95. if tag == "" {
  96. if b.fallbackTag != "" {
  97. newError("fallback to [", b.fallbackTag, "], due to empty tag returned").AtInfo().WriteToLog()
  98. return b.fallbackTag, nil
  99. }
  100. // will use default handler
  101. return "", newError("balancing strategy returns empty tag")
  102. }
  103. return tag, nil
  104. }
  105. func (b *Balancer) InjectContext(ctx context.Context) {
  106. if contextReceiver, ok := b.strategy.(extension.ContextReceiver); ok {
  107. contextReceiver.InjectContext(ctx)
  108. }
  109. }
  110. // SelectOutbounds select outbounds with selectors of the Balancer
  111. func (b *Balancer) SelectOutbounds() ([]string, error) {
  112. hs, ok := b.ohm.(outbound.HandlerSelector)
  113. if !ok {
  114. return nil, newError("outbound.Manager is not a HandlerSelector")
  115. }
  116. tags := hs.Select(b.selectors)
  117. return tags, nil
  118. }
  119. // GetPrincipleTarget implements routing.BalancerPrincipleTarget
  120. func (r *Router) GetPrincipleTarget(tag string) ([]string, error) {
  121. if b, ok := r.balancers[tag]; ok {
  122. if s, ok := b.strategy.(BalancingPrincipleTarget); ok {
  123. candidates, err := b.SelectOutbounds()
  124. if err != nil {
  125. return nil, newError("unable to select outbounds").Base(err)
  126. }
  127. return s.GetPrincipleTarget(candidates), nil
  128. }
  129. return nil, newError("unsupported GetPrincipleTarget")
  130. }
  131. return nil, newError("cannot find tag")
  132. }
  133. // SetOverrideTarget implements routing.BalancerOverrider
  134. func (r *Router) SetOverrideTarget(tag, target string) error {
  135. if b, ok := r.balancers[tag]; ok {
  136. b.override.Put(target)
  137. return nil
  138. }
  139. return newError("cannot find tag")
  140. }
  141. // GetOverrideTarget implements routing.BalancerOverrider
  142. func (r *Router) GetOverrideTarget(tag string) (string, error) {
  143. if b, ok := r.balancers[tag]; ok {
  144. return b.override.Get(), nil
  145. }
  146. return "", newError("cannot find tag")
  147. }