| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 | 
							- package burst
 
- import (
 
- 	"context"
 
- 	"fmt"
 
- 	"strings"
 
- 	"sync"
 
- 	"time"
 
- 	"github.com/xtls/xray-core/common/dice"
 
- )
 
- // HealthPingSettings holds settings for health Checker
 
- type HealthPingSettings struct {
 
- 	Destination   string        `json:"destination"`
 
- 	Connectivity  string        `json:"connectivity"`
 
- 	Interval      time.Duration `json:"interval"`
 
- 	SamplingCount int           `json:"sampling"`
 
- 	Timeout       time.Duration `json:"timeout"`
 
- }
 
- // HealthPing is the health checker for balancers
 
- type HealthPing struct {
 
- 	ctx         context.Context
 
- 	access      sync.Mutex
 
- 	ticker      *time.Ticker
 
- 	tickerClose chan struct{}
 
- 	Settings *HealthPingSettings
 
- 	Results  map[string]*HealthPingRTTS
 
- }
 
- // NewHealthPing creates a new HealthPing with settings
 
- func NewHealthPing(ctx context.Context, config *HealthPingConfig) *HealthPing {
 
- 	settings := &HealthPingSettings{}
 
- 	if config != nil {
 
- 		settings = &HealthPingSettings{
 
- 			Connectivity:  strings.TrimSpace(config.Connectivity),
 
- 			Destination:   strings.TrimSpace(config.Destination),
 
- 			Interval:      time.Duration(config.Interval),
 
- 			SamplingCount: int(config.SamplingCount),
 
- 			Timeout:       time.Duration(config.Timeout),
 
- 		}
 
- 	}
 
- 	if settings.Destination == "" {
 
- 		// Destination URL, need 204 for success return default to chromium
 
- 		// https://github.com/chromium/chromium/blob/main/components/safety_check/url_constants.cc#L10
 
- 		// https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/safety_check/url_constants.cc#10
 
- 		settings.Destination = "https://connectivitycheck.gstatic.com/generate_204"
 
- 	}
 
- 	if settings.Interval == 0 {
 
- 		settings.Interval = time.Duration(1) * time.Minute
 
- 	} else if settings.Interval < 10 {
 
- 		newError("health check interval is too small, 10s is applied").AtWarning().WriteToLog()
 
- 		settings.Interval = time.Duration(10) * time.Second
 
- 	}
 
- 	if settings.SamplingCount <= 0 {
 
- 		settings.SamplingCount = 10
 
- 	}
 
- 	if settings.Timeout <= 0 {
 
- 		// results are saved after all health pings finish,
 
- 		// a larger timeout could possibly makes checks run longer
 
- 		settings.Timeout = time.Duration(5) * time.Second
 
- 	}
 
- 	return &HealthPing{
 
- 		ctx:      ctx,
 
- 		Settings: settings,
 
- 		Results:  nil,
 
- 	}
 
- }
 
- // StartScheduler implements the HealthChecker
 
- func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
 
- 	if h.ticker != nil {
 
- 		return
 
- 	}
 
- 	interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
 
- 	ticker := time.NewTicker(interval)
 
- 	tickerClose := make(chan struct{})
 
- 	h.ticker = ticker
 
- 	h.tickerClose = tickerClose
 
- 	go func() {
 
- 		for {
 
- 			go func() {
 
- 				tags, err := selector()
 
- 				if err != nil {
 
- 					newError("error select outbounds for scheduled health check: ", err).AtWarning().WriteToLog()
 
- 					return
 
- 				}
 
- 				h.doCheck(tags, interval, h.Settings.SamplingCount)
 
- 				h.Cleanup(tags)
 
- 			}()
 
- 			select {
 
- 			case <-ticker.C:
 
- 				continue
 
- 			case <-tickerClose:
 
- 				return
 
- 			}
 
- 		}
 
- 	}()
 
- }
 
- // StopScheduler implements the HealthChecker
 
- func (h *HealthPing) StopScheduler() {
 
- 	if h.ticker == nil {
 
- 		return
 
- 	}
 
- 	h.ticker.Stop()
 
- 	h.ticker = nil
 
- 	close(h.tickerClose)
 
- 	h.tickerClose = nil
 
- }
 
- // Check implements the HealthChecker
 
- func (h *HealthPing) Check(tags []string) error {
 
- 	if len(tags) == 0 {
 
- 		return nil
 
- 	}
 
- 	newError("perform one-time health check for tags ", tags).AtInfo().WriteToLog()
 
- 	h.doCheck(tags, 0, 1)
 
- 	return nil
 
- }
 
- type rtt struct {
 
- 	handler string
 
- 	value   time.Duration
 
- }
 
- // doCheck performs the 'rounds' amount checks in given 'duration'. You should make
 
- // sure all tags are valid for current balancer
 
- func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
 
- 	count := len(tags) * rounds
 
- 	if count == 0 {
 
- 		return
 
- 	}
 
- 	ch := make(chan *rtt, count)
 
- 	for _, tag := range tags {
 
- 		handler := tag
 
- 		client := newPingClient(
 
- 			h.ctx,
 
- 			h.Settings.Destination,
 
- 			h.Settings.Timeout,
 
- 			handler,
 
- 		)
 
- 		for i := 0; i < rounds; i++ {
 
- 			delay := time.Duration(0)
 
- 			if duration > 0 {
 
- 				delay = time.Duration(dice.Roll(int(duration)))
 
- 			}
 
- 			time.AfterFunc(delay, func() {
 
- 				newError("checking ", handler).AtDebug().WriteToLog()
 
- 				delay, err := client.MeasureDelay()
 
- 				if err == nil {
 
- 					ch <- &rtt{
 
- 						handler: handler,
 
- 						value:   delay,
 
- 					}
 
- 					return
 
- 				}
 
- 				if !h.checkConnectivity() {
 
- 					newError("network is down").AtWarning().WriteToLog()
 
- 					ch <- &rtt{
 
- 						handler: handler,
 
- 						value:   0,
 
- 					}
 
- 					return
 
- 				}
 
- 				newError(fmt.Sprintf(
 
- 					"error ping %s with %s: %s",
 
- 					h.Settings.Destination,
 
- 					handler,
 
- 					err,
 
- 				)).AtWarning().WriteToLog()
 
- 				ch <- &rtt{
 
- 					handler: handler,
 
- 					value:   rttFailed,
 
- 				}
 
- 			})
 
- 		}
 
- 	}
 
- 	for i := 0; i < count; i++ {
 
- 		rtt := <-ch
 
- 		if rtt.value > 0 {
 
- 			// should not put results when network is down
 
- 			h.PutResult(rtt.handler, rtt.value)
 
- 		}
 
- 	}
 
- }
 
- // PutResult put a ping rtt to results
 
- func (h *HealthPing) PutResult(tag string, rtt time.Duration) {
 
- 	h.access.Lock()
 
- 	defer h.access.Unlock()
 
- 	if h.Results == nil {
 
- 		h.Results = make(map[string]*HealthPingRTTS)
 
- 	}
 
- 	r, ok := h.Results[tag]
 
- 	if !ok {
 
- 		// validity is 2 times to sampling period, since the check are
 
- 		// distributed in the time line randomly, in extreme cases,
 
- 		// previous checks are distributed on the left, and latters
 
- 		// on the right
 
- 		validity := h.Settings.Interval * time.Duration(h.Settings.SamplingCount) * 2
 
- 		r = NewHealthPingResult(h.Settings.SamplingCount, validity)
 
- 		h.Results[tag] = r
 
- 	}
 
- 	r.Put(rtt)
 
- }
 
- // Cleanup removes results of removed handlers,
 
- // tags should be all valid tags of the Balancer now
 
- func (h *HealthPing) Cleanup(tags []string) {
 
- 	h.access.Lock()
 
- 	defer h.access.Unlock()
 
- 	for tag := range h.Results {
 
- 		found := false
 
- 		for _, v := range tags {
 
- 			if tag == v {
 
- 				found = true
 
- 				break
 
- 			}
 
- 		}
 
- 		if !found {
 
- 			delete(h.Results, tag)
 
- 		}
 
- 	}
 
- }
 
- // checkConnectivity checks the network connectivity, it returns
 
- // true if network is good or "connectivity check url" not set
 
- func (h *HealthPing) checkConnectivity() bool {
 
- 	if h.Settings.Connectivity == "" {
 
- 		return true
 
- 	}
 
- 	tester := newDirectPingClient(
 
- 		h.Settings.Connectivity,
 
- 		h.Settings.Timeout,
 
- 	)
 
- 	if _, err := tester.MeasureDelay(); err != nil {
 
- 		return false
 
- 	}
 
- 	return true
 
- }
 
 
  |