| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- package pipe
- import (
- "context"
- "github.com/xtls/xray-core/common/buf"
- "github.com/xtls/xray-core/common/signal"
- "github.com/xtls/xray-core/common/signal/done"
- "github.com/xtls/xray-core/features/policy"
- )
- // Option for creating new Pipes.
- type Option func(*pipeOption)
- // WithoutSizeLimit returns an Option for Pipe to have no size limit.
- func WithoutSizeLimit() Option {
- return func(opt *pipeOption) {
- opt.limit = -1
- }
- }
- // WithSizeLimit returns an Option for Pipe to have the given size limit.
- func WithSizeLimit(limit int32) Option {
- return func(opt *pipeOption) {
- opt.limit = limit
- }
- }
- func OnTransmission(hook func(mb buf.MultiBuffer) buf.MultiBuffer) Option {
- return func(option *pipeOption) {
- option.onTransmission = hook
- }
- }
- // DiscardOverflow returns an Option for Pipe to discard writes if full.
- func DiscardOverflow() Option {
- return func(opt *pipeOption) {
- opt.discardOverflow = true
- }
- }
- // OptionsFromContext returns a list of Options from context.
- func OptionsFromContext(ctx context.Context) []Option {
- var opt []Option
- bp := policy.BufferPolicyFromContext(ctx)
- if bp.PerConnection >= 0 {
- opt = append(opt, WithSizeLimit(bp.PerConnection))
- } else {
- opt = append(opt, WithoutSizeLimit())
- }
- return opt
- }
- // New creates a new Reader and Writer that connects to each other.
- func New(opts ...Option) (*Reader, *Writer) {
- p := &pipe{
- readSignal: signal.NewNotifier(),
- writeSignal: signal.NewNotifier(),
- done: done.New(),
- errChan: make(chan error, 1),
- option: pipeOption{
- limit: -1,
- },
- }
- for _, opt := range opts {
- opt(&(p.option))
- }
- return &Reader{
- pipe: p,
- }, &Writer{
- pipe: p,
- }
- }
|