| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- package router
- import (
- "context"
- sync "sync"
- "github.com/xtls/xray-core/app/observatory"
- "github.com/xtls/xray-core/common"
- "github.com/xtls/xray-core/common/errors"
- "github.com/xtls/xray-core/core"
- "github.com/xtls/xray-core/features/extension"
- "github.com/xtls/xray-core/features/outbound"
- )
- type BalancingStrategy interface {
- PickOutbound([]string) string
- }
- type BalancingPrincipleTarget interface {
- GetPrincipleTarget([]string) []string
- }
- type RoundRobinStrategy struct {
- FallbackTag string
- ctx context.Context
- observatory extension.Observatory
- mu sync.Mutex
- index int
- }
- func (s *RoundRobinStrategy) InjectContext(ctx context.Context) {
- s.ctx = ctx
- }
- func (s *RoundRobinStrategy) GetPrincipleTarget(strings []string) []string {
- return strings
- }
- func (s *RoundRobinStrategy) PickOutbound(tags []string) string {
- if len(s.FallbackTag) > 0 && s.observatory == nil {
- common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
- s.observatory = observatory
- return nil
- }))
- }
- if s.observatory != nil {
- observeReport, err := s.observatory.GetObservation(s.ctx)
- if err == nil {
- aliveTags := make([]string, 0)
- if result, ok := observeReport.(*observatory.ObservationResult); ok {
- status := result.Status
- statusMap := make(map[string]*observatory.OutboundStatus)
- for _, outboundStatus := range status {
- statusMap[outboundStatus.OutboundTag] = outboundStatus
- }
- for _, candidate := range tags {
- if outboundStatus, found := statusMap[candidate]; found {
- if outboundStatus.Alive {
- aliveTags = append(aliveTags, candidate)
- }
- } else {
- // unfound candidate is considered alive
- aliveTags = append(aliveTags, candidate)
- }
- }
- tags = aliveTags
- }
- }
- }
- n := len(tags)
- if n == 0 {
- // goes to fallbackTag
- return ""
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- tag := tags[s.index%n]
- s.index = (s.index + 1) % n
- return tag
- }
- type Balancer struct {
- selectors []string
- strategy BalancingStrategy
- ohm outbound.Manager
- fallbackTag string
- override override
- }
- // PickOutbound picks the tag of a outbound
- func (b *Balancer) PickOutbound() (string, error) {
- candidates, err := b.SelectOutbounds()
- if err != nil {
- if b.fallbackTag != "" {
- errors.LogInfo(context.Background(), "fallback to [", b.fallbackTag, "], due to error: ", err)
- return b.fallbackTag, nil
- }
- return "", err
- }
- var tag string
- if o := b.override.Get(); o != "" {
- tag = o
- } else {
- tag = b.strategy.PickOutbound(candidates)
- }
- if tag == "" {
- if b.fallbackTag != "" {
- errors.LogInfo(context.Background(), "fallback to [", b.fallbackTag, "], due to empty tag returned")
- return b.fallbackTag, nil
- }
- // will use default handler
- return "", errors.New("balancing strategy returns empty tag")
- }
- return tag, nil
- }
- func (b *Balancer) InjectContext(ctx context.Context) {
- if contextReceiver, ok := b.strategy.(extension.ContextReceiver); ok {
- contextReceiver.InjectContext(ctx)
- }
- }
- // SelectOutbounds select outbounds with selectors of the Balancer
- func (b *Balancer) SelectOutbounds() ([]string, error) {
- hs, ok := b.ohm.(outbound.HandlerSelector)
- if !ok {
- return nil, errors.New("outbound.Manager is not a HandlerSelector")
- }
- tags := hs.Select(b.selectors)
- return tags, nil
- }
- // GetPrincipleTarget implements routing.BalancerPrincipleTarget
- func (r *Router) GetPrincipleTarget(tag string) ([]string, error) {
- if b, ok := r.balancers[tag]; ok {
- if s, ok := b.strategy.(BalancingPrincipleTarget); ok {
- candidates, err := b.SelectOutbounds()
- if err != nil {
- return nil, errors.New("unable to select outbounds").Base(err)
- }
- return s.GetPrincipleTarget(candidates), nil
- }
- return nil, errors.New("unsupported GetPrincipleTarget")
- }
- return nil, errors.New("cannot find tag")
- }
- // SetOverrideTarget implements routing.BalancerOverrider
- func (r *Router) SetOverrideTarget(tag, target string) error {
- if b, ok := r.balancers[tag]; ok {
- b.override.Put(target)
- return nil
- }
- return errors.New("cannot find tag")
- }
- // GetOverrideTarget implements routing.BalancerOverrider
- func (r *Router) GetOverrideTarget(tag string) (string, error) {
- if b, ok := r.balancers[tag]; ok {
- return b.override.Get(), nil
- }
- return "", errors.New("cannot find tag")
- }
|