outbound.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package metrics
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/xtls/xray-core/common"
  6. "github.com/xtls/xray-core/common/errors"
  7. "github.com/xtls/xray-core/common/net"
  8. "github.com/xtls/xray-core/common/net/cnc"
  9. "github.com/xtls/xray-core/common/signal/done"
  10. "github.com/xtls/xray-core/transport"
  11. )
  12. // OutboundListener is a net.Listener for listening metrics http connections.
  13. type OutboundListener struct {
  14. buffer chan net.Conn
  15. done *done.Instance
  16. }
  17. func (l *OutboundListener) add(conn net.Conn) {
  18. select {
  19. case l.buffer <- conn:
  20. case <-l.done.Wait():
  21. conn.Close()
  22. default:
  23. conn.Close()
  24. }
  25. }
  26. // Accept implements net.Listener.
  27. func (l *OutboundListener) Accept() (net.Conn, error) {
  28. select {
  29. case <-l.done.Wait():
  30. return nil, errors.New("listen closed")
  31. case c := <-l.buffer:
  32. return c, nil
  33. }
  34. }
  35. // Close implement net.Listener.
  36. func (l *OutboundListener) Close() error {
  37. common.Must(l.done.Close())
  38. L:
  39. for {
  40. select {
  41. case c := <-l.buffer:
  42. c.Close()
  43. default:
  44. break L
  45. }
  46. }
  47. return nil
  48. }
  49. // Addr implements net.Listener.
  50. func (l *OutboundListener) Addr() net.Addr {
  51. return &net.TCPAddr{
  52. IP: net.IP{0, 0, 0, 0},
  53. Port: 0,
  54. }
  55. }
  56. // Outbound is an outbound.Handler that handles metrics http connections.
  57. type Outbound struct {
  58. tag string
  59. listener *OutboundListener
  60. access sync.RWMutex
  61. closed bool
  62. }
  63. // Dispatch implements outbound.Handler.
  64. func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
  65. co.access.RLock()
  66. if co.closed {
  67. common.Interrupt(link.Reader)
  68. common.Interrupt(link.Writer)
  69. co.access.RUnlock()
  70. return
  71. }
  72. closeSignal := done.New()
  73. c := cnc.NewConnection(cnc.ConnectionInputMulti(link.Writer), cnc.ConnectionOutputMulti(link.Reader), cnc.ConnectionOnClose(closeSignal))
  74. co.listener.add(c)
  75. co.access.RUnlock()
  76. <-closeSignal.Wait()
  77. }
  78. // Tag implements outbound.Handler.
  79. func (co *Outbound) Tag() string {
  80. return co.tag
  81. }
  82. // Start implements common.Runnable.
  83. func (co *Outbound) Start() error {
  84. co.access.Lock()
  85. co.closed = false
  86. co.access.Unlock()
  87. return nil
  88. }
  89. // Close implements common.Closable.
  90. func (co *Outbound) Close() error {
  91. co.access.Lock()
  92. defer co.access.Unlock()
  93. co.closed = true
  94. return co.listener.Close()
  95. }