pipe.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package memnet
  4. import (
  5. "bytes"
  6. "context"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net"
  11. "os"
  12. "sync"
  13. "time"
  14. )
  15. const debugPipe = false
  16. // Pipe implements an in-memory FIFO with timeouts.
  17. type Pipe struct {
  18. name string
  19. maxBuf int
  20. mu sync.Mutex
  21. cnd *sync.Cond
  22. blocked bool
  23. closed bool
  24. buf bytes.Buffer
  25. readTimeout time.Time
  26. writeTimeout time.Time
  27. cancelReadTimer func()
  28. cancelWriteTimer func()
  29. }
  30. // NewPipe creates a Pipe with a buffer size fixed at maxBuf.
  31. func NewPipe(name string, maxBuf int) *Pipe {
  32. p := &Pipe{
  33. name: name,
  34. maxBuf: maxBuf,
  35. }
  36. p.cnd = sync.NewCond(&p.mu)
  37. return p
  38. }
  39. // readOrBlock attempts to read from the buffer, if the buffer is empty and
  40. // the connection hasn't been closed it will block until there is a change.
  41. func (p *Pipe) readOrBlock(b []byte) (int, error) {
  42. p.mu.Lock()
  43. defer p.mu.Unlock()
  44. if !p.readTimeout.IsZero() && !time.Now().Before(p.readTimeout) {
  45. return 0, os.ErrDeadlineExceeded
  46. }
  47. if p.blocked {
  48. p.cnd.Wait()
  49. return 0, nil
  50. }
  51. n, err := p.buf.Read(b)
  52. // err will either be nil or io.EOF.
  53. if err == io.EOF {
  54. if p.closed {
  55. return n, err
  56. }
  57. // Wait for something to change.
  58. p.cnd.Wait()
  59. }
  60. return n, nil
  61. }
  62. // Read implements io.Reader.
  63. // Once the buffer is drained (i.e. after Close), subsequent calls will
  64. // return io.EOF.
  65. func (p *Pipe) Read(b []byte) (n int, err error) {
  66. if debugPipe {
  67. orig := b
  68. defer func() {
  69. log.Printf("Pipe(%q).Read(%q) n=%d, err=%v", p.name, string(orig[:n]), n, err)
  70. }()
  71. }
  72. for n == 0 {
  73. n2, err := p.readOrBlock(b)
  74. if err != nil {
  75. return n2, err
  76. }
  77. n += n2
  78. }
  79. p.cnd.Signal()
  80. return n, nil
  81. }
  82. // writeOrBlock attempts to write to the buffer, if the buffer is full it will
  83. // block until there is a change.
  84. func (p *Pipe) writeOrBlock(b []byte) (int, error) {
  85. p.mu.Lock()
  86. defer p.mu.Unlock()
  87. if p.closed {
  88. return 0, net.ErrClosed
  89. }
  90. if !p.writeTimeout.IsZero() && !time.Now().Before(p.writeTimeout) {
  91. return 0, os.ErrDeadlineExceeded
  92. }
  93. if p.blocked {
  94. p.cnd.Wait()
  95. return 0, nil
  96. }
  97. // Optimistically we want to write the entire slice.
  98. n := len(b)
  99. if limit := p.maxBuf - p.buf.Len(); limit < n {
  100. // However, we don't have enough capacity to write everything.
  101. n = limit
  102. }
  103. if n == 0 {
  104. // Wait for something to change.
  105. p.cnd.Wait()
  106. return 0, nil
  107. }
  108. p.buf.Write(b[:n])
  109. p.cnd.Signal()
  110. return n, nil
  111. }
  112. // Write implements io.Writer.
  113. func (p *Pipe) Write(b []byte) (n int, err error) {
  114. if debugPipe {
  115. orig := b
  116. defer func() {
  117. log.Printf("Pipe(%q).Write(%q) n=%d, err=%v", p.name, string(orig), n, err)
  118. }()
  119. }
  120. for len(b) > 0 {
  121. n2, err := p.writeOrBlock(b)
  122. if err != nil {
  123. return n + n2, err
  124. }
  125. n += n2
  126. b = b[n2:]
  127. }
  128. return n, nil
  129. }
  130. // Close closes the pipe.
  131. func (p *Pipe) Close() error {
  132. p.mu.Lock()
  133. defer p.mu.Unlock()
  134. p.closed = true
  135. p.blocked = false
  136. if p.cancelWriteTimer != nil {
  137. p.cancelWriteTimer()
  138. p.cancelWriteTimer = nil
  139. }
  140. if p.cancelReadTimer != nil {
  141. p.cancelReadTimer()
  142. p.cancelReadTimer = nil
  143. }
  144. p.cnd.Broadcast()
  145. return nil
  146. }
  147. func (p *Pipe) deadlineTimer(t time.Time) func() {
  148. if t.IsZero() {
  149. return nil
  150. }
  151. if t.Before(time.Now()) {
  152. p.cnd.Broadcast()
  153. return nil
  154. }
  155. ctx, cancel := context.WithDeadline(context.Background(), t)
  156. go func() {
  157. <-ctx.Done()
  158. if ctx.Err() == context.DeadlineExceeded {
  159. p.cnd.Broadcast()
  160. }
  161. }()
  162. return cancel
  163. }
  164. // SetReadDeadline sets the deadline for future Read calls.
  165. func (p *Pipe) SetReadDeadline(t time.Time) error {
  166. p.mu.Lock()
  167. defer p.mu.Unlock()
  168. p.readTimeout = t
  169. // If we already have a deadline, cancel it and create a new one.
  170. if p.cancelReadTimer != nil {
  171. p.cancelReadTimer()
  172. p.cancelReadTimer = nil
  173. }
  174. p.cancelReadTimer = p.deadlineTimer(t)
  175. return nil
  176. }
  177. // SetWriteDeadline sets the deadline for future Write calls.
  178. func (p *Pipe) SetWriteDeadline(t time.Time) error {
  179. p.mu.Lock()
  180. defer p.mu.Unlock()
  181. p.writeTimeout = t
  182. // If we already have a deadline, cancel it and create a new one.
  183. if p.cancelWriteTimer != nil {
  184. p.cancelWriteTimer()
  185. p.cancelWriteTimer = nil
  186. }
  187. p.cancelWriteTimer = p.deadlineTimer(t)
  188. return nil
  189. }
  190. // Block will cause all calls to Read and Write to block until they either
  191. // timeout, are unblocked or the pipe is closed.
  192. func (p *Pipe) Block() error {
  193. p.mu.Lock()
  194. defer p.mu.Unlock()
  195. closed := p.closed
  196. blocked := p.blocked
  197. p.blocked = true
  198. if closed {
  199. return fmt.Errorf("memnet.Pipe(%q).Block: closed", p.name)
  200. }
  201. if blocked {
  202. return fmt.Errorf("memnet.Pipe(%q).Block: already blocked", p.name)
  203. }
  204. p.cnd.Broadcast()
  205. return nil
  206. }
  207. // Unblock will cause all blocked Read/Write calls to continue execution.
  208. func (p *Pipe) Unblock() error {
  209. p.mu.Lock()
  210. defer p.mu.Unlock()
  211. closed := p.closed
  212. blocked := p.blocked
  213. p.blocked = false
  214. if closed {
  215. return fmt.Errorf("memnet.Pipe(%q).Block: closed", p.name)
  216. }
  217. if !blocked {
  218. return fmt.Errorf("memnet.Pipe(%q).Block: already unblocked", p.name)
  219. }
  220. p.cnd.Broadcast()
  221. return nil
  222. }