pipe.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package pipe
  2. import (
  3. "context"
  4. "github.com/xtls/xray-core/common/signal"
  5. "github.com/xtls/xray-core/common/signal/done"
  6. "github.com/xtls/xray-core/features/policy"
  7. )
  8. // Option for creating new Pipes.
  9. type Option func(*pipeOption)
  10. // WithoutSizeLimit returns an Option for Pipe to have no size limit.
  11. func WithoutSizeLimit() Option {
  12. return func(opt *pipeOption) {
  13. opt.limit = -1
  14. }
  15. }
  16. // WithSizeLimit returns an Option for Pipe to have the given size limit.
  17. func WithSizeLimit(limit int32) Option {
  18. return func(opt *pipeOption) {
  19. opt.limit = limit
  20. }
  21. }
  22. // DiscardOverflow returns an Option for Pipe to discard writes if full.
  23. func DiscardOverflow() Option {
  24. return func(opt *pipeOption) {
  25. opt.discardOverflow = true
  26. }
  27. }
  28. // OptionsFromContext returns a list of Options from context.
  29. func OptionsFromContext(ctx context.Context) []Option {
  30. var opt []Option
  31. bp := policy.BufferPolicyFromContext(ctx)
  32. if bp.PerConnection >= 0 {
  33. opt = append(opt, WithSizeLimit(bp.PerConnection))
  34. } else {
  35. opt = append(opt, WithoutSizeLimit())
  36. }
  37. return opt
  38. }
  39. // New creates a new Reader and Writer that connects to each other.
  40. func New(opts ...Option) (*Reader, *Writer) {
  41. p := &pipe{
  42. readSignal: signal.NewNotifier(),
  43. writeSignal: signal.NewNotifier(),
  44. done: done.New(),
  45. errChan: make(chan error, 1),
  46. option: pipeOption{
  47. limit: -1,
  48. },
  49. }
  50. for _, opt := range opts {
  51. opt(&(p.option))
  52. }
  53. return &Reader{
  54. pipe: p,
  55. }, &Writer{
  56. pipe: p,
  57. }
  58. }