dispatcher_test.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package udp_test
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. "github.com/xtls/xray-core/common"
  8. "github.com/xtls/xray-core/common/buf"
  9. "github.com/xtls/xray-core/common/net"
  10. "github.com/xtls/xray-core/common/protocol/udp"
  11. "github.com/xtls/xray-core/features/routing"
  12. "github.com/xtls/xray-core/transport"
  13. . "github.com/xtls/xray-core/transport/internet/udp"
  14. "github.com/xtls/xray-core/transport/pipe"
  15. )
  16. type TestDispatcher struct {
  17. OnDispatch func(ctx context.Context, dest net.Destination) (*transport.Link, error)
  18. }
  19. func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) {
  20. return d.OnDispatch(ctx, dest)
  21. }
  22. func (d *TestDispatcher) Start() error {
  23. return nil
  24. }
  25. func (d *TestDispatcher) Close() error {
  26. return nil
  27. }
  28. func (*TestDispatcher) Type() interface{} {
  29. return routing.DispatcherType()
  30. }
  31. func TestSameDestinationDispatching(t *testing.T) {
  32. ctx, cancel := context.WithCancel(context.Background())
  33. uplinkReader, uplinkWriter := pipe.New(pipe.WithSizeLimit(1024))
  34. downlinkReader, downlinkWriter := pipe.New(pipe.WithSizeLimit(1024))
  35. go func() {
  36. for {
  37. data, err := uplinkReader.ReadMultiBuffer()
  38. if err != nil {
  39. break
  40. }
  41. err = downlinkWriter.WriteMultiBuffer(data)
  42. common.Must(err)
  43. }
  44. }()
  45. var count uint32
  46. td := &TestDispatcher{
  47. OnDispatch: func(ctx context.Context, dest net.Destination) (*transport.Link, error) {
  48. atomic.AddUint32(&count, 1)
  49. return &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
  50. },
  51. }
  52. dest := net.UDPDestination(net.LocalHostIP, 53)
  53. b := buf.New()
  54. b.WriteString("abcd")
  55. var msgCount uint32
  56. dispatcher := NewDispatcher(td, func(ctx context.Context, packet *udp.Packet) {
  57. atomic.AddUint32(&msgCount, 1)
  58. })
  59. dispatcher.Dispatch(ctx, dest, b)
  60. for i := 0; i < 5; i++ {
  61. dispatcher.Dispatch(ctx, dest, b)
  62. }
  63. time.Sleep(time.Second)
  64. cancel()
  65. if count != 1 {
  66. t.Error("count: ", count)
  67. }
  68. if v := atomic.LoadUint32(&msgCount); v != 6 {
  69. t.Error("msgCount: ", v)
  70. }
  71. }