observer.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package observatory
  2. import (
  3. "context"
  4. "net"
  5. "net/http"
  6. "net/url"
  7. "sort"
  8. "sync"
  9. "time"
  10. "github.com/xtls/xray-core/common"
  11. "github.com/xtls/xray-core/common/errors"
  12. v2net "github.com/xtls/xray-core/common/net"
  13. "github.com/xtls/xray-core/common/session"
  14. "github.com/xtls/xray-core/common/signal/done"
  15. "github.com/xtls/xray-core/common/task"
  16. "github.com/xtls/xray-core/common/utils"
  17. "github.com/xtls/xray-core/core"
  18. "github.com/xtls/xray-core/features/extension"
  19. "github.com/xtls/xray-core/features/outbound"
  20. "github.com/xtls/xray-core/features/routing"
  21. "github.com/xtls/xray-core/transport/internet/tagged"
  22. "google.golang.org/protobuf/proto"
  23. )
  24. type Observer struct {
  25. config *Config
  26. ctx context.Context
  27. statusLock sync.Mutex
  28. status []*OutboundStatus
  29. finished *done.Instance
  30. ohm outbound.Manager
  31. dispatcher routing.Dispatcher
  32. }
  33. func (o *Observer) GetObservation(ctx context.Context) (proto.Message, error) {
  34. return &ObservationResult{Status: o.status}, nil
  35. }
  36. func (o *Observer) Type() interface{} {
  37. return extension.ObservatoryType()
  38. }
  39. func (o *Observer) Start() error {
  40. if o.config != nil && len(o.config.SubjectSelector) != 0 {
  41. o.finished = done.New()
  42. go o.background()
  43. }
  44. return nil
  45. }
  46. func (o *Observer) Close() error {
  47. if o.finished != nil {
  48. return o.finished.Close()
  49. }
  50. return nil
  51. }
  52. func (o *Observer) background() {
  53. for !o.finished.Done() {
  54. hs, ok := o.ohm.(outbound.HandlerSelector)
  55. if !ok {
  56. errors.LogInfo(o.ctx, "outbound.Manager is not a HandlerSelector")
  57. return
  58. }
  59. outbounds := hs.Select(o.config.SubjectSelector)
  60. o.updateStatus(outbounds)
  61. sleepTime := time.Second * 10
  62. if o.config.ProbeInterval != 0 {
  63. sleepTime = time.Duration(o.config.ProbeInterval)
  64. }
  65. if !o.config.EnableConcurrency {
  66. sort.Strings(outbounds)
  67. for _, v := range outbounds {
  68. result := o.probe(v)
  69. o.updateStatusForResult(v, &result)
  70. if o.finished.Done() {
  71. return
  72. }
  73. time.Sleep(sleepTime)
  74. }
  75. continue
  76. }
  77. ch := make(chan struct{}, len(outbounds))
  78. for _, v := range outbounds {
  79. go func(v string) {
  80. result := o.probe(v)
  81. o.updateStatusForResult(v, &result)
  82. ch <- struct{}{}
  83. }(v)
  84. }
  85. for range outbounds {
  86. select {
  87. case <-ch:
  88. case <-o.finished.Wait():
  89. return
  90. }
  91. }
  92. time.Sleep(sleepTime)
  93. }
  94. }
  95. func (o *Observer) updateStatus(outbounds []string) {
  96. o.statusLock.Lock()
  97. defer o.statusLock.Unlock()
  98. // TODO should remove old inbound that is removed
  99. _ = outbounds
  100. }
  101. func (o *Observer) probe(outbound string) ProbeResult {
  102. errorCollectorForRequest := newErrorCollector()
  103. httpTransport := http.Transport{
  104. Proxy: func(*http.Request) (*url.URL, error) {
  105. return nil, nil
  106. },
  107. DialContext: func(ctx context.Context, network string, addr string) (net.Conn, error) {
  108. var connection net.Conn
  109. taskErr := task.Run(ctx, func() error {
  110. // MUST use Xray's built in context system
  111. dest, err := v2net.ParseDestination(network + ":" + addr)
  112. if err != nil {
  113. return errors.New("cannot understand address").Base(err)
  114. }
  115. trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
  116. conn, err := tagged.Dialer(trackedCtx, o.dispatcher, dest, outbound)
  117. if err != nil {
  118. return errors.New("cannot dial remote address ", dest).Base(err)
  119. }
  120. connection = conn
  121. return nil
  122. })
  123. if taskErr != nil {
  124. return nil, errors.New("cannot finish connection").Base(taskErr)
  125. }
  126. return connection, nil
  127. },
  128. TLSHandshakeTimeout: time.Second * 5,
  129. }
  130. httpClient := &http.Client{
  131. Transport: &httpTransport,
  132. CheckRedirect: func(req *http.Request, via []*http.Request) error {
  133. return http.ErrUseLastResponse
  134. },
  135. Jar: nil,
  136. Timeout: time.Second * 5,
  137. }
  138. var GETTime time.Duration
  139. err := task.Run(o.ctx, func() error {
  140. startTime := time.Now()
  141. probeURL := "https://www.google.com/generate_204"
  142. if o.config.ProbeUrl != "" {
  143. probeURL = o.config.ProbeUrl
  144. }
  145. req, _ := http.NewRequest(http.MethodGet, probeURL, nil)
  146. req.Header.Set("User-Agent", utils.ChromeUA)
  147. response, err := httpClient.Do(req)
  148. if err != nil {
  149. return errors.New("outbound failed to relay connection").Base(err)
  150. }
  151. if response.Body != nil {
  152. response.Body.Close()
  153. }
  154. endTime := time.Now()
  155. GETTime = endTime.Sub(startTime)
  156. return nil
  157. })
  158. if err != nil {
  159. var errorMessage = "the outbound " + outbound + " is dead: GET request failed:" + err.Error() + "with outbound handler report underlying connection failed"
  160. errors.LogInfoInner(o.ctx, errorCollectorForRequest.UnderlyingError(), errorMessage)
  161. return ProbeResult{Alive: false, LastErrorReason: errorMessage}
  162. }
  163. errors.LogInfo(o.ctx, "the outbound ", outbound, " is alive:", GETTime.Seconds())
  164. return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
  165. }
  166. func (o *Observer) updateStatusForResult(outbound string, result *ProbeResult) {
  167. o.statusLock.Lock()
  168. defer o.statusLock.Unlock()
  169. var status *OutboundStatus
  170. if location := o.findStatusLocationLockHolderOnly(outbound); location != -1 {
  171. status = o.status[location]
  172. } else {
  173. status = &OutboundStatus{}
  174. o.status = append(o.status, status)
  175. }
  176. status.LastTryTime = time.Now().Unix()
  177. status.OutboundTag = outbound
  178. status.Alive = result.Alive
  179. if result.Alive {
  180. status.Delay = result.Delay
  181. status.LastSeenTime = status.LastTryTime
  182. status.LastErrorReason = ""
  183. } else {
  184. status.LastErrorReason = result.LastErrorReason
  185. status.Delay = 99999999
  186. }
  187. }
  188. func (o *Observer) findStatusLocationLockHolderOnly(outbound string) int {
  189. for i, v := range o.status {
  190. if v.OutboundTag == outbound {
  191. return i
  192. }
  193. }
  194. return -1
  195. }
  196. func New(ctx context.Context, config *Config) (*Observer, error) {
  197. var outboundManager outbound.Manager
  198. var dispatcher routing.Dispatcher
  199. err := core.RequireFeatures(ctx, func(om outbound.Manager, rd routing.Dispatcher) {
  200. outboundManager = om
  201. dispatcher = rd
  202. })
  203. if err != nil {
  204. return nil, errors.New("Cannot get depended features").Base(err)
  205. }
  206. return &Observer{
  207. config: config,
  208. ctx: ctx,
  209. ohm: outboundManager,
  210. dispatcher: dispatcher,
  211. }, nil
  212. }
  213. func init() {
  214. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  215. return New(ctx, config.(*Config))
  216. }))
  217. }