client_test.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package mux_test
  2. import (
  3. "context"
  4. "testing"
  5. "time"
  6. "github.com/golang/mock/gomock"
  7. "github.com/xtls/xray-core/common"
  8. "github.com/xtls/xray-core/common/errors"
  9. "github.com/xtls/xray-core/common/mux"
  10. "github.com/xtls/xray-core/common/net"
  11. "github.com/xtls/xray-core/common/session"
  12. "github.com/xtls/xray-core/testing/mocks"
  13. "github.com/xtls/xray-core/transport"
  14. "github.com/xtls/xray-core/transport/pipe"
  15. )
  16. func TestIncrementalPickerFailure(t *testing.T) {
  17. mockCtl := gomock.NewController(t)
  18. defer mockCtl.Finish()
  19. mockWorkerFactory := mocks.NewMuxClientWorkerFactory(mockCtl)
  20. mockWorkerFactory.EXPECT().Create().Return(nil, errors.New("test"))
  21. picker := mux.IncrementalWorkerPicker{
  22. Factory: mockWorkerFactory,
  23. }
  24. _, err := picker.PickAvailable()
  25. if err == nil {
  26. t.Error("expected error, but nil")
  27. }
  28. }
  29. func TestClientWorkerEOF(t *testing.T) {
  30. reader, writer := pipe.New(pipe.WithoutSizeLimit())
  31. common.Must(writer.Close())
  32. worker, err := mux.NewClientWorker(transport.Link{Reader: reader, Writer: writer}, mux.ClientStrategy{})
  33. common.Must(err)
  34. time.Sleep(time.Millisecond * 500)
  35. f := worker.Dispatch(context.Background(), nil)
  36. if f {
  37. t.Error("expected failed dispatching, but actually not")
  38. }
  39. }
  40. func TestClientWorkerClose(t *testing.T) {
  41. mockCtl := gomock.NewController(t)
  42. defer mockCtl.Finish()
  43. r1, w1 := pipe.New(pipe.WithoutSizeLimit())
  44. worker1, err := mux.NewClientWorker(transport.Link{
  45. Reader: r1,
  46. Writer: w1,
  47. }, mux.ClientStrategy{
  48. MaxConcurrency: 4,
  49. MaxConnection: 4,
  50. })
  51. common.Must(err)
  52. r2, w2 := pipe.New(pipe.WithoutSizeLimit())
  53. worker2, err := mux.NewClientWorker(transport.Link{
  54. Reader: r2,
  55. Writer: w2,
  56. }, mux.ClientStrategy{
  57. MaxConcurrency: 4,
  58. MaxConnection: 4,
  59. })
  60. common.Must(err)
  61. factory := mocks.NewMuxClientWorkerFactory(mockCtl)
  62. gomock.InOrder(
  63. factory.EXPECT().Create().Return(worker1, nil),
  64. factory.EXPECT().Create().Return(worker2, nil),
  65. )
  66. picker := &mux.IncrementalWorkerPicker{
  67. Factory: factory,
  68. }
  69. manager := &mux.ClientManager{
  70. Picker: picker,
  71. }
  72. tr1, tw1 := pipe.New(pipe.WithoutSizeLimit())
  73. ctx1 := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{
  74. Target: net.TCPDestination(net.DomainAddress("www.example.com"), 80),
  75. }})
  76. common.Must(manager.Dispatch(ctx1, &transport.Link{
  77. Reader: tr1,
  78. Writer: tw1,
  79. }))
  80. defer tw1.Close()
  81. common.Must(w1.Close())
  82. time.Sleep(time.Millisecond * 500)
  83. if !worker1.Closed() {
  84. t.Error("worker1 is not finished")
  85. }
  86. tr2, tw2 := pipe.New(pipe.WithoutSizeLimit())
  87. ctx2 := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{
  88. Target: net.TCPDestination(net.DomainAddress("www.example.com"), 80),
  89. }})
  90. common.Must(manager.Dispatch(ctx2, &transport.Link{
  91. Reader: tr2,
  92. Writer: tw2,
  93. }))
  94. defer tw2.Close()
  95. common.Must(w2.Close())
  96. }