|
@@ -30,10 +30,15 @@ var errSniffingTimeout = newError("timeout on sniffing")
|
|
|
|
|
|
type cachedReader struct {
|
|
|
sync.Mutex
|
|
|
- reader *pipe.Reader
|
|
|
+ reader timeoutReader
|
|
|
cache buf.MultiBuffer
|
|
|
}
|
|
|
|
|
|
+type timeoutReader interface {
|
|
|
+ buf.Reader
|
|
|
+ buf.TimeoutReader
|
|
|
+}
|
|
|
+
|
|
|
func (r *cachedReader) Cache(b *buf.Buffer) {
|
|
|
mb, _ := r.reader.ReadMultiBufferTimeout(time.Millisecond * 100)
|
|
|
r.Lock()
|
|
@@ -84,7 +89,7 @@ func (r *cachedReader) Interrupt() {
|
|
|
r.cache = buf.ReleaseMulti(r.cache)
|
|
|
}
|
|
|
r.Unlock()
|
|
|
- r.reader.Interrupt()
|
|
|
+ common.Interrupt(r.reader)
|
|
|
}
|
|
|
|
|
|
// DefaultDispatcher is a default implementation of Dispatcher.
|
|
@@ -345,7 +350,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
|
|
|
d.routedDispatch(ctx, outbound, destination)
|
|
|
} else {
|
|
|
cReader := &cachedReader{
|
|
|
- reader: outbound.Reader.(*pipe.Reader),
|
|
|
+ reader: outbound.Reader.(timeoutReader),
|
|
|
}
|
|
|
outbound.Reader = cReader
|
|
|
result, err := sniffer(ctx, cReader, sniffingRequest.MetadataOnly, destination.Network)
|