loopback.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package loopback
  2. import (
  3. "context"
  4. "github.com/xtls/xray-core/common"
  5. "github.com/xtls/xray-core/common/buf"
  6. "github.com/xtls/xray-core/common/net"
  7. "github.com/xtls/xray-core/common/net/cnc"
  8. "github.com/xtls/xray-core/common/retry"
  9. "github.com/xtls/xray-core/common/session"
  10. "github.com/xtls/xray-core/common/task"
  11. "github.com/xtls/xray-core/core"
  12. "github.com/xtls/xray-core/features/routing"
  13. "github.com/xtls/xray-core/transport"
  14. "github.com/xtls/xray-core/transport/internet"
  15. )
  16. type Loopback struct {
  17. config *Config
  18. dispatcherInstance routing.Dispatcher
  19. }
  20. func (l *Loopback) Process(ctx context.Context, link *transport.Link, _ internet.Dialer) error {
  21. outbounds := session.OutboundsFromContext(ctx)
  22. ob := outbounds[len(outbounds) - 1]
  23. if !ob.Target.IsValid() {
  24. return newError("target not specified.")
  25. }
  26. ob.Name = "loopback"
  27. destination := ob.Target
  28. newError("opening connection to ", destination).WriteToLog(session.ExportIDToError(ctx))
  29. input := link.Reader
  30. output := link.Writer
  31. var conn net.Conn
  32. err := retry.ExponentialBackoff(2, 100).On(func() error {
  33. dialDest := destination
  34. content := new(session.Content)
  35. content.SkipDNSResolve = true
  36. ctx = session.ContextWithContent(ctx, content)
  37. inbound := session.InboundFromContext(ctx)
  38. inbound.Tag = l.config.InboundTag
  39. ctx = session.ContextWithInbound(ctx, inbound)
  40. rawConn, err := l.dispatcherInstance.Dispatch(ctx, dialDest)
  41. if err != nil {
  42. return err
  43. }
  44. var readerOpt cnc.ConnectionOption
  45. if dialDest.Network == net.Network_TCP {
  46. readerOpt = cnc.ConnectionOutputMulti(rawConn.Reader)
  47. } else {
  48. readerOpt = cnc.ConnectionOutputMultiUDP(rawConn.Reader)
  49. }
  50. conn = cnc.NewConnection(cnc.ConnectionInputMulti(rawConn.Writer), readerOpt)
  51. return nil
  52. })
  53. if err != nil {
  54. return newError("failed to open connection to ", destination).Base(err)
  55. }
  56. defer conn.Close()
  57. requestDone := func() error {
  58. var writer buf.Writer
  59. if destination.Network == net.Network_TCP {
  60. writer = buf.NewWriter(conn)
  61. } else {
  62. writer = &buf.SequentialWriter{Writer: conn}
  63. }
  64. if err := buf.Copy(input, writer); err != nil {
  65. return newError("failed to process request").Base(err)
  66. }
  67. return nil
  68. }
  69. responseDone := func() error {
  70. var reader buf.Reader
  71. if destination.Network == net.Network_TCP {
  72. reader = buf.NewReader(conn)
  73. } else {
  74. reader = buf.NewPacketReader(conn)
  75. }
  76. if err := buf.Copy(reader, output); err != nil {
  77. return newError("failed to process response").Base(err)
  78. }
  79. return nil
  80. }
  81. if err := task.Run(ctx, requestDone, task.OnSuccess(responseDone, task.Close(output))); err != nil {
  82. return newError("connection ends").Base(err)
  83. }
  84. return nil
  85. }
  86. func (l *Loopback) init(config *Config, dispatcherInstance routing.Dispatcher) error {
  87. l.dispatcherInstance = dispatcherInstance
  88. l.config = config
  89. return nil
  90. }
  91. func init() {
  92. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  93. l := new(Loopback)
  94. err := core.RequireFeatures(ctx, func(dispatcherInstance routing.Dispatcher) error {
  95. return l.init(config.(*Config), dispatcherInstance)
  96. })
  97. return l, err
  98. }))
  99. }