| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 | 
							- package mux
 
- import (
 
- 	"github.com/xtls/xray-core/common"
 
- 	"github.com/xtls/xray-core/common/buf"
 
- 	"github.com/xtls/xray-core/common/net"
 
- 	"github.com/xtls/xray-core/common/protocol"
 
- 	"github.com/xtls/xray-core/common/serial"
 
- )
 
- type Writer struct {
 
- 	dest         net.Destination
 
- 	writer       buf.Writer
 
- 	id           uint16
 
- 	followup     bool
 
- 	hasError     bool
 
- 	transferType protocol.TransferType
 
- }
 
- func NewWriter(id uint16, dest net.Destination, writer buf.Writer, transferType protocol.TransferType) *Writer {
 
- 	return &Writer{
 
- 		id:           id,
 
- 		dest:         dest,
 
- 		writer:       writer,
 
- 		followup:     false,
 
- 		transferType: transferType,
 
- 	}
 
- }
 
- func NewResponseWriter(id uint16, writer buf.Writer, transferType protocol.TransferType) *Writer {
 
- 	return &Writer{
 
- 		id:           id,
 
- 		writer:       writer,
 
- 		followup:     true,
 
- 		transferType: transferType,
 
- 	}
 
- }
 
- func (w *Writer) getNextFrameMeta() FrameMetadata {
 
- 	meta := FrameMetadata{
 
- 		SessionID: w.id,
 
- 		Target:    w.dest,
 
- 	}
 
- 	if w.followup {
 
- 		meta.SessionStatus = SessionStatusKeep
 
- 	} else {
 
- 		w.followup = true
 
- 		meta.SessionStatus = SessionStatusNew
 
- 	}
 
- 	return meta
 
- }
 
- func (w *Writer) writeMetaOnly() error {
 
- 	meta := w.getNextFrameMeta()
 
- 	b := buf.New()
 
- 	if err := meta.WriteTo(b); err != nil {
 
- 		return err
 
- 	}
 
- 	return w.writer.WriteMultiBuffer(buf.MultiBuffer{b})
 
- }
 
- func writeMetaWithFrame(writer buf.Writer, meta FrameMetadata, data buf.MultiBuffer) error {
 
- 	frame := buf.New()
 
- 	if len(data) == 1 {
 
- 		frame.UDP = data[0].UDP
 
- 	}
 
- 	if err := meta.WriteTo(frame); err != nil {
 
- 		return err
 
- 	}
 
- 	if _, err := serial.WriteUint16(frame, uint16(data.Len())); err != nil {
 
- 		return err
 
- 	}
 
- 	mb2 := make(buf.MultiBuffer, 0, len(data)+1)
 
- 	mb2 = append(mb2, frame)
 
- 	mb2 = append(mb2, data...)
 
- 	return writer.WriteMultiBuffer(mb2)
 
- }
 
- func (w *Writer) writeData(mb buf.MultiBuffer) error {
 
- 	meta := w.getNextFrameMeta()
 
- 	meta.Option.Set(OptionData)
 
- 	return writeMetaWithFrame(w.writer, meta, mb)
 
- }
 
- // WriteMultiBuffer implements buf.Writer.
 
- func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error {
 
- 	defer buf.ReleaseMulti(mb)
 
- 	if mb.IsEmpty() {
 
- 		return w.writeMetaOnly()
 
- 	}
 
- 	for !mb.IsEmpty() {
 
- 		var chunk buf.MultiBuffer
 
- 		if w.transferType == protocol.TransferTypeStream {
 
- 			mb, chunk = buf.SplitSize(mb, 8*1024)
 
- 		} else {
 
- 			mb2, b := buf.SplitFirst(mb)
 
- 			mb = mb2
 
- 			chunk = buf.MultiBuffer{b}
 
- 		}
 
- 		if err := w.writeData(chunk); err != nil {
 
- 			return err
 
- 		}
 
- 	}
 
- 	return nil
 
- }
 
- // Close implements common.Closable.
 
- func (w *Writer) Close() error {
 
- 	meta := FrameMetadata{
 
- 		SessionID:     w.id,
 
- 		SessionStatus: SessionStatusEnd,
 
- 	}
 
- 	if w.hasError {
 
- 		meta.Option.Set(OptionError)
 
- 	}
 
- 	frame := buf.New()
 
- 	common.Must(meta.WriteTo(frame))
 
- 	w.writer.WriteMultiBuffer(buf.MultiBuffer{frame})
 
- 	return nil
 
- }
 
 
  |