Browse Source

Select alive only node when fallbackTag is given

- Apply to random and roundrobin strategy
- Require observatory config

Co-authored-by: Mark Ma <[email protected]>
yuhan6665 1 year ago
parent
commit
84eeb56ae4
4 changed files with 100 additions and 5 deletions
  1. 48 1
      app/router/balancing.go
  2. 2 2
      app/router/config.go
  3. 47 1
      app/router/strategy_random.go
  4. 3 1
      infra/conf/router_test.go

+ 48 - 1
app/router/balancing.go

@@ -4,6 +4,9 @@ import (
 	"context"
 	sync "sync"
 
+	"github.com/xtls/xray-core/app/observatory"
+	"github.com/xtls/xray-core/common"
+	"github.com/xtls/xray-core/core"
 	"github.com/xtls/xray-core/features/extension"
 	"github.com/xtls/xray-core/features/outbound"
 )
@@ -17,14 +20,58 @@ type BalancingPrincipleTarget interface {
 }
 
 type RoundRobinStrategy struct {
+	FallbackTag string
+
+	ctx         context.Context
+	observatory extension.Observatory
 	mu    sync.Mutex
 	index int
 }
 
+func (s *RoundRobinStrategy) InjectContext(ctx context.Context) {
+	s.ctx = ctx
+}
+
+func (s *RoundRobinStrategy) GetPrincipleTarget(strings []string) []string {
+	return strings
+}
+
 func (s *RoundRobinStrategy) PickOutbound(tags []string) string {
+	if len(s.FallbackTag) > 0 && s.observatory == nil {
+		common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
+			s.observatory = observatory
+			return nil
+		}))
+	}
+	if s.observatory != nil {
+		observeReport, err := s.observatory.GetObservation(s.ctx)
+		if err == nil {
+			aliveTags := make([]string, 0)
+			if result, ok := observeReport.(*observatory.ObservationResult); ok {
+				status := result.Status
+				statusMap := make(map[string]*observatory.OutboundStatus)
+				for _, outboundStatus := range status {
+					statusMap[outboundStatus.OutboundTag] = outboundStatus
+				}
+				for _, candidate := range tags {
+					if outboundStatus, found := statusMap[candidate]; found {
+						if outboundStatus.Alive {
+							aliveTags = append(aliveTags, candidate)
+						}
+					} else {
+						// unfound candidate is considered alive
+						aliveTags = append(aliveTags, candidate)
+					}
+				}
+				tags = aliveTags
+			}
+		}
+	}
+
 	n := len(tags)
 	if n == 0 {
-		panic("0 tags")
+		// goes to fallbackTag
+		return ""
 	}
 
 	s.mu.Lock()

+ 2 - 2
app/router/config.go

@@ -135,7 +135,7 @@ func (br *BalancingRule) Build(ohm outbound.Manager, dispatcher routing.Dispatch
 	case "roundrobin":
 		return &Balancer{
 			selectors:   br.OutboundSelector,
-			strategy:    &RoundRobinStrategy{},
+			strategy:    &RoundRobinStrategy{FallbackTag: br.FallbackTag},
 			fallbackTag: br.FallbackTag,
 			ohm:         ohm,
 		}, nil
@@ -162,7 +162,7 @@ func (br *BalancingRule) Build(ohm outbound.Manager, dispatcher routing.Dispatch
 			selectors:   br.OutboundSelector,
 			ohm:         ohm,
 			fallbackTag: br.FallbackTag,
-			strategy:    &RandomStrategy{},
+			strategy:    &RandomStrategy{FallbackTag: br.FallbackTag},
 		}, nil
 	default:
 		return nil, newError("unrecognized balancer type")

+ 47 - 1
app/router/strategy_random.go

@@ -1,17 +1,63 @@
 package router
 
 import (
+	"context"
+
+	"github.com/xtls/xray-core/app/observatory"
+	"github.com/xtls/xray-core/common"
 	"github.com/xtls/xray-core/common/dice"
+	"github.com/xtls/xray-core/core"
+	"github.com/xtls/xray-core/features/extension"
 )
 
 // RandomStrategy represents a random balancing strategy
-type RandomStrategy struct{}
+type RandomStrategy struct{
+	FallbackTag string
+
+	ctx         context.Context
+	observatory extension.Observatory
+}
+
+func (s *RandomStrategy) InjectContext(ctx context.Context) {
+	s.ctx = ctx
+}
 
 func (s *RandomStrategy) GetPrincipleTarget(strings []string) []string {
 	return strings
 }
 
 func (s *RandomStrategy) PickOutbound(candidates []string) string {
+	if len(s.FallbackTag) > 0 && s.observatory == nil {
+		common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
+			s.observatory = observatory
+			return nil
+		}))
+	}
+	if s.observatory != nil {
+		observeReport, err := s.observatory.GetObservation(s.ctx)
+		if err == nil {
+			aliveTags := make([]string, 0)
+			if result, ok := observeReport.(*observatory.ObservationResult); ok {
+				status := result.Status
+				statusMap := make(map[string]*observatory.OutboundStatus)
+				for _, outboundStatus := range status {
+					statusMap[outboundStatus.OutboundTag] = outboundStatus
+				}
+				for _, candidate := range candidates {
+					if outboundStatus, found := statusMap[candidate]; found {
+						if outboundStatus.Alive {
+							aliveTags = append(aliveTags, candidate)
+						}
+					} else {
+						// unfound candidate is considered alive
+						aliveTags = append(aliveTags, candidate)
+					}
+				}
+				candidates = aliveTags
+			}
+		}
+	}
+
 	count := len(candidates)
 	if count == 0 {
 		// goes to fallbackTag

+ 3 - 1
infra/conf/router_test.go

@@ -97,7 +97,8 @@ func TestRouterConfig(t *testing.T) {
 				"balancers": [
 					{
 						"tag": "b1",
-						"selector": ["test"]
+						"selector": ["test"],
+						"fallbackTag": "fall"
 					},
 					{
 						"tag": "b2",
@@ -137,6 +138,7 @@ func TestRouterConfig(t *testing.T) {
 						Tag:              "b1",
 						OutboundSelector: []string{"test"},
 						Strategy:         "random",
+						FallbackTag:      "fall",
 					},
 					{
 						Tag:              "b2",