pipe.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package pipe
  2. import (
  3. "context"
  4. "github.com/xtls/xray-core/common/buf"
  5. "github.com/xtls/xray-core/common/signal"
  6. "github.com/xtls/xray-core/common/signal/done"
  7. "github.com/xtls/xray-core/features/policy"
  8. )
  9. // Option for creating new Pipes.
  10. type Option func(*pipeOption)
  11. // WithoutSizeLimit returns an Option for Pipe to have no size limit.
  12. func WithoutSizeLimit() Option {
  13. return func(opt *pipeOption) {
  14. opt.limit = -1
  15. }
  16. }
  17. // WithSizeLimit returns an Option for Pipe to have the given size limit.
  18. func WithSizeLimit(limit int32) Option {
  19. return func(opt *pipeOption) {
  20. opt.limit = limit
  21. }
  22. }
  23. func OnTransmission(hook func(mb buf.MultiBuffer) buf.MultiBuffer) Option {
  24. return func(option *pipeOption) {
  25. option.onTransmission = hook
  26. }
  27. }
  28. // DiscardOverflow returns an Option for Pipe to discard writes if full.
  29. func DiscardOverflow() Option {
  30. return func(opt *pipeOption) {
  31. opt.discardOverflow = true
  32. }
  33. }
  34. // OptionsFromContext returns a list of Options from context.
  35. func OptionsFromContext(ctx context.Context) []Option {
  36. var opt []Option
  37. bp := policy.BufferPolicyFromContext(ctx)
  38. if bp.PerConnection >= 0 {
  39. opt = append(opt, WithSizeLimit(bp.PerConnection))
  40. } else {
  41. opt = append(opt, WithoutSizeLimit())
  42. }
  43. return opt
  44. }
  45. // New creates a new Reader and Writer that connects to each other.
  46. func New(opts ...Option) (*Reader, *Writer) {
  47. p := &pipe{
  48. readSignal: signal.NewNotifier(),
  49. writeSignal: signal.NewNotifier(),
  50. done: done.New(),
  51. errChan: make(chan error, 1),
  52. option: pipeOption{
  53. limit: -1,
  54. },
  55. }
  56. for _, opt := range opts {
  57. opt(&(p.option))
  58. }
  59. return &Reader{
  60. pipe: p,
  61. }, &Writer{
  62. pipe: p,
  63. }
  64. }