hub.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package udp
  2. import (
  3. "context"
  4. "github.com/xtls/xray-core/common/buf"
  5. "github.com/xtls/xray-core/common/errors"
  6. "github.com/xtls/xray-core/common/net"
  7. "github.com/xtls/xray-core/common/protocol/udp"
  8. "github.com/xtls/xray-core/transport/internet"
  9. )
  10. type HubOption func(h *Hub)
  11. func HubCapacity(capacity int) HubOption {
  12. return func(h *Hub) {
  13. h.capacity = capacity
  14. }
  15. }
  16. func HubReceiveOriginalDestination(r bool) HubOption {
  17. return func(h *Hub) {
  18. h.recvOrigDest = r
  19. }
  20. }
  21. type Hub struct {
  22. conn *net.UDPConn
  23. cache chan *udp.Packet
  24. capacity int
  25. recvOrigDest bool
  26. }
  27. func ListenUDP(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, options ...HubOption) (*Hub, error) {
  28. hub := &Hub{
  29. capacity: 256,
  30. recvOrigDest: false,
  31. }
  32. for _, opt := range options {
  33. opt(hub)
  34. }
  35. var sockopt *internet.SocketConfig
  36. if streamSettings != nil {
  37. sockopt = streamSettings.SocketSettings
  38. }
  39. if sockopt != nil && sockopt.ReceiveOriginalDestAddress {
  40. hub.recvOrigDest = true
  41. }
  42. udpConn, err := internet.ListenSystemPacket(ctx, &net.UDPAddr{
  43. IP: address.IP(),
  44. Port: int(port),
  45. }, sockopt)
  46. if err != nil {
  47. return nil, err
  48. }
  49. errors.LogInfo(ctx, "listening UDP on ", address, ":", port)
  50. hub.conn = udpConn.(*net.UDPConn)
  51. hub.cache = make(chan *udp.Packet, hub.capacity)
  52. go hub.start()
  53. return hub, nil
  54. }
  55. // Close implements net.Listener.
  56. func (h *Hub) Close() error {
  57. h.conn.Close()
  58. return nil
  59. }
  60. func (h *Hub) WriteTo(payload []byte, dest net.Destination) (int, error) {
  61. return h.conn.WriteToUDP(payload, &net.UDPAddr{
  62. IP: dest.Address.IP(),
  63. Port: int(dest.Port),
  64. })
  65. }
  66. func (h *Hub) start() {
  67. c := h.cache
  68. defer close(c)
  69. oobBytes := make([]byte, 256)
  70. for {
  71. buffer := buf.New()
  72. var noob int
  73. var addr *net.UDPAddr
  74. rawBytes := buffer.Extend(buf.Size)
  75. n, noob, _, addr, err := ReadUDPMsg(h.conn, rawBytes, oobBytes)
  76. if err != nil {
  77. errors.LogInfoInner(context.Background(), err, "failed to read UDP msg")
  78. buffer.Release()
  79. break
  80. }
  81. buffer.Resize(0, int32(n))
  82. if buffer.IsEmpty() {
  83. buffer.Release()
  84. continue
  85. }
  86. payload := &udp.Packet{
  87. Payload: buffer,
  88. Source: net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port)),
  89. }
  90. if h.recvOrigDest && noob > 0 {
  91. payload.Target = RetrieveOriginalDest(oobBytes[:noob])
  92. if payload.Target.IsValid() {
  93. errors.LogDebug(context.Background(), "UDP original destination: ", payload.Target)
  94. } else {
  95. errors.LogInfo(context.Background(), "failed to read UDP original destination")
  96. }
  97. }
  98. select {
  99. case c <- payload:
  100. default:
  101. buffer.Release()
  102. payload.Payload = nil
  103. }
  104. }
  105. }
  106. // Addr implements net.Listener.
  107. func (h *Hub) Addr() net.Addr {
  108. return h.conn.LocalAddr()
  109. }
  110. func (h *Hub) Receive() <-chan *udp.Packet {
  111. return h.cache
  112. }