loopback.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package loopback
  2. import (
  3. "context"
  4. "github.com/xtls/xray-core/common"
  5. "github.com/xtls/xray-core/common/buf"
  6. "github.com/xtls/xray-core/common/errors"
  7. "github.com/xtls/xray-core/common/net"
  8. "github.com/xtls/xray-core/common/net/cnc"
  9. "github.com/xtls/xray-core/common/retry"
  10. "github.com/xtls/xray-core/common/session"
  11. "github.com/xtls/xray-core/common/task"
  12. "github.com/xtls/xray-core/core"
  13. "github.com/xtls/xray-core/features/routing"
  14. "github.com/xtls/xray-core/transport"
  15. "github.com/xtls/xray-core/transport/internet"
  16. )
  17. type Loopback struct {
  18. config *Config
  19. dispatcherInstance routing.Dispatcher
  20. }
  21. func (l *Loopback) Process(ctx context.Context, link *transport.Link, _ internet.Dialer) error {
  22. outbounds := session.OutboundsFromContext(ctx)
  23. ob := outbounds[len(outbounds)-1]
  24. if !ob.Target.IsValid() {
  25. return errors.New("target not specified.")
  26. }
  27. ob.Name = "loopback"
  28. destination := ob.Target
  29. errors.LogInfo(ctx, "opening connection to ", destination)
  30. input := link.Reader
  31. output := link.Writer
  32. var conn net.Conn
  33. err := retry.ExponentialBackoff(2, 100).On(func() error {
  34. dialDest := destination
  35. content := new(session.Content)
  36. content.SkipDNSResolve = true
  37. ctx = session.ContextWithContent(ctx, content)
  38. inbound := session.InboundFromContext(ctx)
  39. if inbound == nil {
  40. inbound = &session.Inbound{}
  41. }
  42. inbound.Tag = l.config.InboundTag
  43. ctx = session.ContextWithInbound(ctx, inbound)
  44. rawConn, err := l.dispatcherInstance.Dispatch(ctx, dialDest)
  45. if err != nil {
  46. return err
  47. }
  48. var readerOpt cnc.ConnectionOption
  49. if dialDest.Network == net.Network_TCP {
  50. readerOpt = cnc.ConnectionOutputMulti(rawConn.Reader)
  51. } else {
  52. readerOpt = cnc.ConnectionOutputMultiUDP(rawConn.Reader)
  53. }
  54. conn = cnc.NewConnection(cnc.ConnectionInputMulti(rawConn.Writer), readerOpt)
  55. return nil
  56. })
  57. if err != nil {
  58. return errors.New("failed to open connection to ", destination).Base(err)
  59. }
  60. defer conn.Close()
  61. requestDone := func() error {
  62. var writer buf.Writer
  63. if destination.Network == net.Network_TCP {
  64. writer = buf.NewWriter(conn)
  65. } else {
  66. writer = &buf.SequentialWriter{Writer: conn}
  67. }
  68. if err := buf.Copy(input, writer); err != nil {
  69. return errors.New("failed to process request").Base(err)
  70. }
  71. return nil
  72. }
  73. responseDone := func() error {
  74. var reader buf.Reader
  75. if destination.Network == net.Network_TCP {
  76. reader = buf.NewReader(conn)
  77. } else {
  78. reader = buf.NewPacketReader(conn)
  79. }
  80. if err := buf.Copy(reader, output); err != nil {
  81. return errors.New("failed to process response").Base(err)
  82. }
  83. return nil
  84. }
  85. if err := task.Run(ctx, requestDone, task.OnSuccess(responseDone, task.Close(output))); err != nil {
  86. return errors.New("connection ends").Base(err)
  87. }
  88. return nil
  89. }
  90. func (l *Loopback) init(config *Config, dispatcherInstance routing.Dispatcher) error {
  91. l.dispatcherInstance = dispatcherInstance
  92. l.config = config
  93. return nil
  94. }
  95. func init() {
  96. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  97. l := new(Loopback)
  98. err := core.RequireFeatures(ctx, func(dispatcherInstance routing.Dispatcher) error {
  99. return l.init(config.(*Config), dispatcherInstance)
  100. })
  101. return l, err
  102. }))
  103. }