server_test.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package mux_test
  2. import (
  3. "context"
  4. "testing"
  5. "github.com/xtls/xray-core/common"
  6. "github.com/xtls/xray-core/common/buf"
  7. "github.com/xtls/xray-core/common/mux"
  8. "github.com/xtls/xray-core/common/net"
  9. "github.com/xtls/xray-core/common/session"
  10. "github.com/xtls/xray-core/features/routing"
  11. "github.com/xtls/xray-core/transport"
  12. "github.com/xtls/xray-core/transport/pipe"
  13. )
  14. func newLinkPair() (*transport.Link, *transport.Link) {
  15. opt := pipe.WithoutSizeLimit()
  16. uplinkReader, uplinkWriter := pipe.New(opt)
  17. downlinkReader, downlinkWriter := pipe.New(opt)
  18. uplink := &transport.Link{
  19. Reader: uplinkReader,
  20. Writer: downlinkWriter,
  21. }
  22. downlink := &transport.Link{
  23. Reader: downlinkReader,
  24. Writer: uplinkWriter,
  25. }
  26. return uplink, downlink
  27. }
  28. type TestDispatcher struct {
  29. OnDispatch func(ctx context.Context, dest net.Destination) (*transport.Link, error)
  30. }
  31. func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) {
  32. return d.OnDispatch(ctx, dest)
  33. }
  34. func (d *TestDispatcher) DispatchLink(ctx context.Context, destination net.Destination, outbound *transport.Link) error {
  35. return nil
  36. }
  37. func (d *TestDispatcher) Start() error {
  38. return nil
  39. }
  40. func (d *TestDispatcher) Close() error {
  41. return nil
  42. }
  43. func (*TestDispatcher) Type() interface{} {
  44. return routing.DispatcherType()
  45. }
  46. func TestRegressionOutboundLeak(t *testing.T) {
  47. originalOutbounds := []*session.Outbound{{}}
  48. serverCtx := session.ContextWithOutbounds(context.Background(), originalOutbounds)
  49. websiteUplink, websiteDownlink := newLinkPair()
  50. dispatcher := TestDispatcher{
  51. OnDispatch: func(ctx context.Context, dest net.Destination) (*transport.Link, error) {
  52. // emulate what DefaultRouter.Dispatch does, and mutate something on the context
  53. ob := session.OutboundsFromContext(ctx)[0]
  54. ob.Target = dest
  55. return websiteDownlink, nil
  56. },
  57. }
  58. muxServerUplink, muxServerDownlink := newLinkPair()
  59. _, err := mux.NewServerWorker(serverCtx, &dispatcher, muxServerUplink)
  60. common.Must(err)
  61. client, err := mux.NewClientWorker(*muxServerDownlink, mux.ClientStrategy{})
  62. common.Must(err)
  63. clientCtx := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{
  64. Target: net.TCPDestination(net.DomainAddress("www.example.com"), 80),
  65. }})
  66. muxClientUplink, muxClientDownlink := newLinkPair()
  67. ok := client.Dispatch(clientCtx, muxClientUplink)
  68. if !ok {
  69. t.Error("failed to dispatch")
  70. }
  71. {
  72. b := buf.FromBytes([]byte("hello"))
  73. common.Must(muxClientDownlink.Writer.WriteMultiBuffer(buf.MultiBuffer{b}))
  74. }
  75. resMb, err := websiteUplink.Reader.ReadMultiBuffer()
  76. common.Must(err)
  77. res := resMb.String()
  78. if res != "hello" {
  79. t.Error("upload: ", res)
  80. }
  81. {
  82. b := buf.FromBytes([]byte("world"))
  83. common.Must(websiteUplink.Writer.WriteMultiBuffer(buf.MultiBuffer{b}))
  84. }
  85. resMb, err = muxClientDownlink.Reader.ReadMultiBuffer()
  86. common.Must(err)
  87. res = resMb.String()
  88. if res != "world" {
  89. t.Error("download: ", res)
  90. }
  91. outbounds := session.OutboundsFromContext(serverCtx)
  92. if outbounds[0] != originalOutbounds[0] {
  93. t.Error("outbound got reassigned: ", outbounds[0])
  94. }
  95. if outbounds[0].Target.Address != nil {
  96. t.Error("outbound target got leaked: ", outbounds[0].Target.String())
  97. }
  98. }