writer.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package mux
  2. import (
  3. "github.com/xtls/xray-core/common"
  4. "github.com/xtls/xray-core/common/buf"
  5. "github.com/xtls/xray-core/common/net"
  6. "github.com/xtls/xray-core/common/protocol"
  7. "github.com/xtls/xray-core/common/serial"
  8. "github.com/xtls/xray-core/common/session"
  9. )
  10. type Writer struct {
  11. dest net.Destination
  12. writer buf.Writer
  13. id uint16
  14. followup bool
  15. hasError bool
  16. transferType protocol.TransferType
  17. globalID [8]byte
  18. inbound *session.Inbound
  19. }
  20. func NewWriter(id uint16, dest net.Destination, writer buf.Writer, transferType protocol.TransferType, globalID [8]byte, inbound *session.Inbound) *Writer {
  21. return &Writer{
  22. id: id,
  23. dest: dest,
  24. writer: writer,
  25. followup: false,
  26. transferType: transferType,
  27. globalID: globalID,
  28. inbound: inbound,
  29. }
  30. }
  31. func NewResponseWriter(id uint16, writer buf.Writer, transferType protocol.TransferType) *Writer {
  32. return &Writer{
  33. id: id,
  34. writer: writer,
  35. followup: true,
  36. transferType: transferType,
  37. }
  38. }
  39. func (w *Writer) getNextFrameMeta() FrameMetadata {
  40. meta := FrameMetadata{
  41. SessionID: w.id,
  42. Target: w.dest,
  43. GlobalID: w.globalID,
  44. Inbound: w.inbound,
  45. }
  46. if w.followup {
  47. meta.SessionStatus = SessionStatusKeep
  48. } else {
  49. w.followup = true
  50. meta.SessionStatus = SessionStatusNew
  51. }
  52. return meta
  53. }
  54. func (w *Writer) writeMetaOnly() error {
  55. meta := w.getNextFrameMeta()
  56. b := buf.New()
  57. if err := meta.WriteTo(b); err != nil {
  58. return err
  59. }
  60. return w.writer.WriteMultiBuffer(buf.MultiBuffer{b})
  61. }
  62. func writeMetaWithFrame(writer buf.Writer, meta FrameMetadata, data buf.MultiBuffer) error {
  63. frame := buf.New()
  64. if len(data) == 1 {
  65. frame.UDP = data[0].UDP
  66. }
  67. if err := meta.WriteTo(frame); err != nil {
  68. return err
  69. }
  70. if _, err := serial.WriteUint16(frame, uint16(data.Len())); err != nil {
  71. return err
  72. }
  73. mb2 := make(buf.MultiBuffer, 0, len(data)+1)
  74. mb2 = append(mb2, frame)
  75. mb2 = append(mb2, data...)
  76. return writer.WriteMultiBuffer(mb2)
  77. }
  78. func (w *Writer) writeData(mb buf.MultiBuffer) error {
  79. meta := w.getNextFrameMeta()
  80. meta.Option.Set(OptionData)
  81. return writeMetaWithFrame(w.writer, meta, mb)
  82. }
  83. // WriteMultiBuffer implements buf.Writer.
  84. func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error {
  85. defer buf.ReleaseMulti(mb)
  86. if mb.IsEmpty() {
  87. return w.writeMetaOnly()
  88. }
  89. for !mb.IsEmpty() {
  90. var chunk buf.MultiBuffer
  91. if w.transferType == protocol.TransferTypeStream {
  92. mb, chunk = buf.SplitSize(mb, 8*1024)
  93. } else {
  94. mb2, b := buf.SplitFirst(mb)
  95. mb = mb2
  96. chunk = buf.MultiBuffer{b}
  97. }
  98. if err := w.writeData(chunk); err != nil {
  99. return err
  100. }
  101. }
  102. return nil
  103. }
  104. // Close implements common.Closable.
  105. func (w *Writer) Close() error {
  106. meta := FrameMetadata{
  107. SessionID: w.id,
  108. SessionStatus: SessionStatusEnd,
  109. }
  110. if w.hasError {
  111. meta.Option.Set(OptionError)
  112. }
  113. frame := buf.New()
  114. common.Must(meta.WriteTo(frame))
  115. w.writer.WriteMultiBuffer(buf.MultiBuffer{frame})
  116. return nil
  117. }