writer.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package mux
  2. import (
  3. "github.com/xtls/xray-core/common"
  4. "github.com/xtls/xray-core/common/buf"
  5. "github.com/xtls/xray-core/common/net"
  6. "github.com/xtls/xray-core/common/protocol"
  7. "github.com/xtls/xray-core/common/serial"
  8. )
  9. type Writer struct {
  10. dest net.Destination
  11. writer buf.Writer
  12. id uint16
  13. followup bool
  14. hasError bool
  15. transferType protocol.TransferType
  16. globalID [8]byte
  17. }
  18. func NewWriter(id uint16, dest net.Destination, writer buf.Writer, transferType protocol.TransferType, globalID [8]byte) *Writer {
  19. return &Writer{
  20. id: id,
  21. dest: dest,
  22. writer: writer,
  23. followup: false,
  24. transferType: transferType,
  25. globalID: globalID,
  26. }
  27. }
  28. func NewResponseWriter(id uint16, writer buf.Writer, transferType protocol.TransferType) *Writer {
  29. return &Writer{
  30. id: id,
  31. writer: writer,
  32. followup: true,
  33. transferType: transferType,
  34. }
  35. }
  36. func (w *Writer) getNextFrameMeta() FrameMetadata {
  37. meta := FrameMetadata{
  38. SessionID: w.id,
  39. Target: w.dest,
  40. GlobalID: w.globalID,
  41. }
  42. if w.followup {
  43. meta.SessionStatus = SessionStatusKeep
  44. } else {
  45. w.followup = true
  46. meta.SessionStatus = SessionStatusNew
  47. }
  48. return meta
  49. }
  50. func (w *Writer) writeMetaOnly() error {
  51. meta := w.getNextFrameMeta()
  52. b := buf.New()
  53. if err := meta.WriteTo(b); err != nil {
  54. return err
  55. }
  56. return w.writer.WriteMultiBuffer(buf.MultiBuffer{b})
  57. }
  58. func writeMetaWithFrame(writer buf.Writer, meta FrameMetadata, data buf.MultiBuffer) error {
  59. frame := buf.New()
  60. if len(data) == 1 {
  61. frame.UDP = data[0].UDP
  62. }
  63. if err := meta.WriteTo(frame); err != nil {
  64. return err
  65. }
  66. if _, err := serial.WriteUint16(frame, uint16(data.Len())); err != nil {
  67. return err
  68. }
  69. mb2 := make(buf.MultiBuffer, 0, len(data)+1)
  70. mb2 = append(mb2, frame)
  71. mb2 = append(mb2, data...)
  72. return writer.WriteMultiBuffer(mb2)
  73. }
  74. func (w *Writer) writeData(mb buf.MultiBuffer) error {
  75. meta := w.getNextFrameMeta()
  76. meta.Option.Set(OptionData)
  77. return writeMetaWithFrame(w.writer, meta, mb)
  78. }
  79. // WriteMultiBuffer implements buf.Writer.
  80. func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error {
  81. defer buf.ReleaseMulti(mb)
  82. if mb.IsEmpty() {
  83. return w.writeMetaOnly()
  84. }
  85. for !mb.IsEmpty() {
  86. var chunk buf.MultiBuffer
  87. if w.transferType == protocol.TransferTypeStream {
  88. mb, chunk = buf.SplitSize(mb, 8*1024)
  89. } else {
  90. mb2, b := buf.SplitFirst(mb)
  91. mb = mb2
  92. chunk = buf.MultiBuffer{b}
  93. }
  94. if err := w.writeData(chunk); err != nil {
  95. return err
  96. }
  97. }
  98. return nil
  99. }
  100. // Close implements common.Closable.
  101. func (w *Writer) Close() error {
  102. meta := FrameMetadata{
  103. SessionID: w.id,
  104. SessionStatus: SessionStatusEnd,
  105. }
  106. if w.hasError {
  107. meta.Option.Set(OptionError)
  108. }
  109. frame := buf.New()
  110. common.Must(meta.WriteTo(frame))
  111. w.writer.WriteMultiBuffer(buf.MultiBuffer{frame})
  112. return nil
  113. }