outbound.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package commander
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/xtls/xray-core/common"
  6. "github.com/xtls/xray-core/common/errors"
  7. "github.com/xtls/xray-core/common/net"
  8. "github.com/xtls/xray-core/common/net/cnc"
  9. "github.com/xtls/xray-core/common/serial"
  10. "github.com/xtls/xray-core/common/signal/done"
  11. "github.com/xtls/xray-core/transport"
  12. )
  13. // OutboundListener is a net.Listener for listening gRPC connections.
  14. type OutboundListener struct {
  15. buffer chan net.Conn
  16. done *done.Instance
  17. }
  18. func (l *OutboundListener) add(conn net.Conn) {
  19. select {
  20. case l.buffer <- conn:
  21. case <-l.done.Wait():
  22. conn.Close()
  23. default:
  24. conn.Close()
  25. }
  26. }
  27. // Accept implements net.Listener.
  28. func (l *OutboundListener) Accept() (net.Conn, error) {
  29. select {
  30. case <-l.done.Wait():
  31. return nil, errors.New("listen closed")
  32. case c := <-l.buffer:
  33. return c, nil
  34. }
  35. }
  36. // Close implements net.Listener.
  37. func (l *OutboundListener) Close() error {
  38. common.Must(l.done.Close())
  39. L:
  40. for {
  41. select {
  42. case c := <-l.buffer:
  43. c.Close()
  44. default:
  45. break L
  46. }
  47. }
  48. return nil
  49. }
  50. // Addr implements net.Listener.
  51. func (l *OutboundListener) Addr() net.Addr {
  52. return &net.TCPAddr{
  53. IP: net.IP{0, 0, 0, 0},
  54. Port: 0,
  55. }
  56. }
  57. // Outbound is a outbound.Handler that handles gRPC connections.
  58. type Outbound struct {
  59. tag string
  60. listener *OutboundListener
  61. access sync.RWMutex
  62. closed bool
  63. }
  64. // Dispatch implements outbound.Handler.
  65. func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
  66. co.access.RLock()
  67. if co.closed {
  68. common.Interrupt(link.Reader)
  69. common.Interrupt(link.Writer)
  70. co.access.RUnlock()
  71. return
  72. }
  73. closeSignal := done.New()
  74. c := cnc.NewConnection(cnc.ConnectionInputMulti(link.Writer), cnc.ConnectionOutputMulti(link.Reader), cnc.ConnectionOnClose(closeSignal))
  75. co.listener.add(c)
  76. co.access.RUnlock()
  77. <-closeSignal.Wait()
  78. }
  79. // Tag implements outbound.Handler.
  80. func (co *Outbound) Tag() string {
  81. return co.tag
  82. }
  83. // Start implements common.Runnable.
  84. func (co *Outbound) Start() error {
  85. co.access.Lock()
  86. co.closed = false
  87. co.access.Unlock()
  88. return nil
  89. }
  90. // Close implements common.Closable.
  91. func (co *Outbound) Close() error {
  92. co.access.Lock()
  93. defer co.access.Unlock()
  94. co.closed = true
  95. return co.listener.Close()
  96. }
  97. // SenderSettings implements outbound.Handler.
  98. func (co *Outbound) SenderSettings() *serial.TypedMessage {
  99. return nil
  100. }
  101. // ProxySettings implements outbound.Handler.
  102. func (co *Outbound) ProxySettings() *serial.TypedMessage {
  103. return nil
  104. }