hunkconn.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package encoding
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "github.com/xtls/xray-core/common/buf"
  7. xnet "github.com/xtls/xray-core/common/net"
  8. "github.com/xtls/xray-core/common/net/cnc"
  9. "github.com/xtls/xray-core/common/signal/done"
  10. "google.golang.org/grpc/metadata"
  11. "google.golang.org/grpc/peer"
  12. )
  13. type HunkConn interface {
  14. Context() context.Context
  15. Send(*Hunk) error
  16. Recv() (*Hunk, error)
  17. SendMsg(m interface{}) error
  18. RecvMsg(m interface{}) error
  19. }
  20. type StreamCloser interface {
  21. CloseSend() error
  22. }
  23. type HunkReaderWriter struct {
  24. hc HunkConn
  25. cancel context.CancelFunc
  26. done *done.Instance
  27. buf []byte
  28. index int
  29. }
  30. func NewHunkReadWriter(hc HunkConn, cancel context.CancelFunc) *HunkReaderWriter {
  31. return &HunkReaderWriter{hc, cancel, done.New(), nil, 0}
  32. }
  33. func NewHunkConn(hc HunkConn, cancel context.CancelFunc) net.Conn {
  34. var rAddr net.Addr
  35. pr, ok := peer.FromContext(hc.Context())
  36. if ok {
  37. rAddr = pr.Addr
  38. } else {
  39. rAddr = &net.TCPAddr{
  40. IP: []byte{0, 0, 0, 0},
  41. Port: 0,
  42. }
  43. }
  44. md, ok := metadata.FromIncomingContext(hc.Context())
  45. if ok {
  46. header := md.Get("x-real-ip")
  47. if len(header) > 0 {
  48. realip := xnet.ParseAddress(header[0])
  49. if realip.Family().IsIP() {
  50. rAddr = &net.TCPAddr{
  51. IP: realip.IP(),
  52. Port: 0,
  53. }
  54. }
  55. }
  56. }
  57. wrc := NewHunkReadWriter(hc, cancel)
  58. return cnc.NewConnection(
  59. cnc.ConnectionInput(wrc),
  60. cnc.ConnectionOutput(wrc),
  61. cnc.ConnectionOnClose(wrc),
  62. cnc.ConnectionRemoteAddr(rAddr),
  63. )
  64. }
  65. func (h *HunkReaderWriter) forceFetch() error {
  66. hunk, err := h.hc.Recv()
  67. if err != nil {
  68. if err == io.EOF {
  69. return err
  70. }
  71. return newError("failed to fetch hunk from gRPC tunnel").Base(err)
  72. }
  73. h.buf = hunk.Data
  74. h.index = 0
  75. return nil
  76. }
  77. func (h *HunkReaderWriter) Read(buf []byte) (int, error) {
  78. if h.done.Done() {
  79. return 0, io.EOF
  80. }
  81. if h.index >= len(h.buf) {
  82. if err := h.forceFetch(); err != nil {
  83. return 0, err
  84. }
  85. }
  86. n := copy(buf, h.buf[h.index:])
  87. h.index += n
  88. return n, nil
  89. }
  90. func (h *HunkReaderWriter) ReadMultiBuffer() (buf.MultiBuffer, error) {
  91. if h.done.Done() {
  92. return nil, io.EOF
  93. }
  94. if h.index >= len(h.buf) {
  95. if err := h.forceFetch(); err != nil {
  96. return nil, err
  97. }
  98. }
  99. if cap(h.buf) >= buf.Size {
  100. b := h.buf
  101. h.index = len(h.buf)
  102. return buf.MultiBuffer{buf.NewExisted(b)}, nil
  103. }
  104. b := buf.New()
  105. _, err := b.ReadFrom(h)
  106. if err != nil {
  107. return nil, err
  108. }
  109. return buf.MultiBuffer{b}, nil
  110. }
  111. func (h *HunkReaderWriter) Write(buf []byte) (int, error) {
  112. if h.done.Done() {
  113. return 0, io.ErrClosedPipe
  114. }
  115. err := h.hc.Send(&Hunk{Data: buf[:]})
  116. if err != nil {
  117. return 0, newError("failed to send data over gRPC tunnel").Base(err)
  118. }
  119. return len(buf), nil
  120. }
  121. func (h *HunkReaderWriter) Close() error {
  122. if h.cancel != nil {
  123. h.cancel()
  124. }
  125. if sc, match := h.hc.(StreamCloser); match {
  126. return sc.CloseSend()
  127. }
  128. return h.done.Close()
  129. }