healthping.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package burst
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/xtls/xray-core/common/dice"
  9. )
  10. // HealthPingSettings holds settings for health Checker
  11. type HealthPingSettings struct {
  12. Destination string `json:"destination"`
  13. Connectivity string `json:"connectivity"`
  14. Interval time.Duration `json:"interval"`
  15. SamplingCount int `json:"sampling"`
  16. Timeout time.Duration `json:"timeout"`
  17. }
  18. // HealthPing is the health checker for balancers
  19. type HealthPing struct {
  20. ctx context.Context
  21. access sync.Mutex
  22. ticker *time.Ticker
  23. tickerClose chan struct{}
  24. Settings *HealthPingSettings
  25. Results map[string]*HealthPingRTTS
  26. }
  27. // NewHealthPing creates a new HealthPing with settings
  28. func NewHealthPing(ctx context.Context, config *HealthPingConfig) *HealthPing {
  29. settings := &HealthPingSettings{}
  30. if config != nil {
  31. settings = &HealthPingSettings{
  32. Connectivity: strings.TrimSpace(config.Connectivity),
  33. Destination: strings.TrimSpace(config.Destination),
  34. Interval: time.Duration(config.Interval),
  35. SamplingCount: int(config.SamplingCount),
  36. Timeout: time.Duration(config.Timeout),
  37. }
  38. }
  39. if settings.Destination == "" {
  40. // Destination URL, need 204 for success return default to chromium
  41. // https://github.com/chromium/chromium/blob/main/components/safety_check/url_constants.cc#L10
  42. // https://chromium.googlesource.com/chromium/src/+/refs/heads/main/components/safety_check/url_constants.cc#10
  43. settings.Destination = "https://connectivitycheck.gstatic.com/generate_204"
  44. }
  45. if settings.Interval == 0 {
  46. settings.Interval = time.Duration(1) * time.Minute
  47. } else if settings.Interval < 10 {
  48. newError("health check interval is too small, 10s is applied").AtWarning().WriteToLog()
  49. settings.Interval = time.Duration(10) * time.Second
  50. }
  51. if settings.SamplingCount <= 0 {
  52. settings.SamplingCount = 10
  53. }
  54. if settings.Timeout <= 0 {
  55. // results are saved after all health pings finish,
  56. // a larger timeout could possibly makes checks run longer
  57. settings.Timeout = time.Duration(5) * time.Second
  58. }
  59. return &HealthPing{
  60. ctx: ctx,
  61. Settings: settings,
  62. Results: nil,
  63. }
  64. }
  65. // StartScheduler implements the HealthChecker
  66. func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
  67. if h.ticker != nil {
  68. return
  69. }
  70. interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
  71. ticker := time.NewTicker(interval)
  72. tickerClose := make(chan struct{})
  73. h.ticker = ticker
  74. h.tickerClose = tickerClose
  75. go func() {
  76. for {
  77. go func() {
  78. tags, err := selector()
  79. if err != nil {
  80. newError("error select outbounds for scheduled health check: ", err).AtWarning().WriteToLog()
  81. return
  82. }
  83. h.doCheck(tags, interval, h.Settings.SamplingCount)
  84. h.Cleanup(tags)
  85. }()
  86. select {
  87. case <-ticker.C:
  88. continue
  89. case <-tickerClose:
  90. return
  91. }
  92. }
  93. }()
  94. }
  95. // StopScheduler implements the HealthChecker
  96. func (h *HealthPing) StopScheduler() {
  97. if h.ticker == nil {
  98. return
  99. }
  100. h.ticker.Stop()
  101. h.ticker = nil
  102. close(h.tickerClose)
  103. h.tickerClose = nil
  104. }
  105. // Check implements the HealthChecker
  106. func (h *HealthPing) Check(tags []string) error {
  107. if len(tags) == 0 {
  108. return nil
  109. }
  110. newError("perform one-time health check for tags ", tags).AtInfo().WriteToLog()
  111. h.doCheck(tags, 0, 1)
  112. return nil
  113. }
  114. type rtt struct {
  115. handler string
  116. value time.Duration
  117. }
  118. // doCheck performs the 'rounds' amount checks in given 'duration'. You should make
  119. // sure all tags are valid for current balancer
  120. func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
  121. count := len(tags) * rounds
  122. if count == 0 {
  123. return
  124. }
  125. ch := make(chan *rtt, count)
  126. for _, tag := range tags {
  127. handler := tag
  128. client := newPingClient(
  129. h.ctx,
  130. h.Settings.Destination,
  131. h.Settings.Timeout,
  132. handler,
  133. )
  134. for i := 0; i < rounds; i++ {
  135. delay := time.Duration(0)
  136. if duration > 0 {
  137. delay = time.Duration(dice.Roll(int(duration)))
  138. }
  139. time.AfterFunc(delay, func() {
  140. newError("checking ", handler).AtDebug().WriteToLog()
  141. delay, err := client.MeasureDelay()
  142. if err == nil {
  143. ch <- &rtt{
  144. handler: handler,
  145. value: delay,
  146. }
  147. return
  148. }
  149. if !h.checkConnectivity() {
  150. newError("network is down").AtWarning().WriteToLog()
  151. ch <- &rtt{
  152. handler: handler,
  153. value: 0,
  154. }
  155. return
  156. }
  157. newError(fmt.Sprintf(
  158. "error ping %s with %s: %s",
  159. h.Settings.Destination,
  160. handler,
  161. err,
  162. )).AtWarning().WriteToLog()
  163. ch <- &rtt{
  164. handler: handler,
  165. value: rttFailed,
  166. }
  167. })
  168. }
  169. }
  170. for i := 0; i < count; i++ {
  171. rtt := <-ch
  172. if rtt.value > 0 {
  173. // should not put results when network is down
  174. h.PutResult(rtt.handler, rtt.value)
  175. }
  176. }
  177. }
  178. // PutResult put a ping rtt to results
  179. func (h *HealthPing) PutResult(tag string, rtt time.Duration) {
  180. h.access.Lock()
  181. defer h.access.Unlock()
  182. if h.Results == nil {
  183. h.Results = make(map[string]*HealthPingRTTS)
  184. }
  185. r, ok := h.Results[tag]
  186. if !ok {
  187. // validity is 2 times to sampling period, since the check are
  188. // distributed in the time line randomly, in extreme cases,
  189. // previous checks are distributed on the left, and latters
  190. // on the right
  191. validity := h.Settings.Interval * time.Duration(h.Settings.SamplingCount) * 2
  192. r = NewHealthPingResult(h.Settings.SamplingCount, validity)
  193. h.Results[tag] = r
  194. }
  195. r.Put(rtt)
  196. }
  197. // Cleanup removes results of removed handlers,
  198. // tags should be all valid tags of the Balancer now
  199. func (h *HealthPing) Cleanup(tags []string) {
  200. h.access.Lock()
  201. defer h.access.Unlock()
  202. for tag := range h.Results {
  203. found := false
  204. for _, v := range tags {
  205. if tag == v {
  206. found = true
  207. break
  208. }
  209. }
  210. if !found {
  211. delete(h.Results, tag)
  212. }
  213. }
  214. }
  215. // checkConnectivity checks the network connectivity, it returns
  216. // true if network is good or "connectivity check url" not set
  217. func (h *HealthPing) checkConnectivity() bool {
  218. if h.Settings.Connectivity == "" {
  219. return true
  220. }
  221. tester := newDirectPingClient(
  222. h.Settings.Connectivity,
  223. h.Settings.Timeout,
  224. )
  225. if _, err := tester.MeasureDelay(); err != nil {
  226. return false
  227. }
  228. return true
  229. }