loopback.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package loopback
  2. import (
  3. "context"
  4. "github.com/xtls/xray-core/common"
  5. "github.com/xtls/xray-core/common/buf"
  6. "github.com/xtls/xray-core/common/net"
  7. "github.com/xtls/xray-core/common/net/cnc"
  8. "github.com/xtls/xray-core/common/retry"
  9. "github.com/xtls/xray-core/common/session"
  10. "github.com/xtls/xray-core/common/task"
  11. "github.com/xtls/xray-core/core"
  12. "github.com/xtls/xray-core/features/routing"
  13. "github.com/xtls/xray-core/transport"
  14. "github.com/xtls/xray-core/transport/internet"
  15. )
  16. type Loopback struct {
  17. config *Config
  18. dispatcherInstance routing.Dispatcher
  19. }
  20. func (l *Loopback) Process(ctx context.Context, link *transport.Link, _ internet.Dialer) error {
  21. outbound := session.OutboundFromContext(ctx)
  22. if outbound == nil || !outbound.Target.IsValid() {
  23. return newError("target not specified.")
  24. }
  25. destination := outbound.Target
  26. newError("opening connection to ", destination).WriteToLog(session.ExportIDToError(ctx))
  27. input := link.Reader
  28. output := link.Writer
  29. var conn net.Conn
  30. err := retry.ExponentialBackoff(2, 100).On(func() error {
  31. dialDest := destination
  32. content := new(session.Content)
  33. content.SkipDNSResolve = true
  34. ctx = session.ContextWithContent(ctx, content)
  35. inbound := session.InboundFromContext(ctx)
  36. inbound.Tag = l.config.InboundTag
  37. ctx = session.ContextWithInbound(ctx, inbound)
  38. rawConn, err := l.dispatcherInstance.Dispatch(ctx, dialDest)
  39. if err != nil {
  40. return err
  41. }
  42. var readerOpt cnc.ConnectionOption
  43. if dialDest.Network == net.Network_TCP {
  44. readerOpt = cnc.ConnectionOutputMulti(rawConn.Reader)
  45. } else {
  46. readerOpt = cnc.ConnectionOutputMultiUDP(rawConn.Reader)
  47. }
  48. conn = cnc.NewConnection(cnc.ConnectionInputMulti(rawConn.Writer), readerOpt)
  49. return nil
  50. })
  51. if err != nil {
  52. return newError("failed to open connection to ", destination).Base(err)
  53. }
  54. defer conn.Close()
  55. requestDone := func() error {
  56. var writer buf.Writer
  57. if destination.Network == net.Network_TCP {
  58. writer = buf.NewWriter(conn)
  59. } else {
  60. writer = &buf.SequentialWriter{Writer: conn}
  61. }
  62. if err := buf.Copy(input, writer); err != nil {
  63. return newError("failed to process request").Base(err)
  64. }
  65. return nil
  66. }
  67. responseDone := func() error {
  68. var reader buf.Reader
  69. if destination.Network == net.Network_TCP {
  70. reader = buf.NewReader(conn)
  71. } else {
  72. reader = buf.NewPacketReader(conn)
  73. }
  74. if err := buf.Copy(reader, output); err != nil {
  75. return newError("failed to process response").Base(err)
  76. }
  77. return nil
  78. }
  79. if err := task.Run(ctx, requestDone, task.OnSuccess(responseDone, task.Close(output))); err != nil {
  80. return newError("connection ends").Base(err)
  81. }
  82. return nil
  83. }
  84. func (l *Loopback) init(config *Config, dispatcherInstance routing.Dispatcher) error {
  85. l.dispatcherInstance = dispatcherInstance
  86. l.config = config
  87. return nil
  88. }
  89. func init() {
  90. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  91. l := new(Loopback)
  92. err := core.RequireFeatures(ctx, func(dispatcherInstance routing.Dispatcher) error {
  93. return l.init(config.(*Config), dispatcherInstance)
  94. })
  95. return l, err
  96. }))
  97. }