pool.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package guerrilla
  2. import (
  3. "errors"
  4. "github.com/flashmob/go-guerrilla/log"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. var (
  11. ErrPoolShuttingDown = errors.New("server pool: shutting down")
  12. )
  13. // a struct can be pooled if it has the following interface
  14. type Poolable interface {
  15. // ability to set read/write timeout
  16. setTimeout(t time.Duration)
  17. // set a new connection and client id
  18. init(c net.Conn, clientID uint64)
  19. // get a unique id
  20. getID() uint64
  21. }
  22. // Pool holds Clients.
  23. type Pool struct {
  24. // clients that are ready to be borrowed
  25. pool chan Poolable
  26. // semaphore to control number of maximum borrowed clients
  27. sem chan bool
  28. // book-keeping of clients that have been lent
  29. activeClients lentClients
  30. isShuttingDownFlg atomic.Value
  31. poolGuard sync.Mutex
  32. ShutdownChan chan int
  33. }
  34. type lentClients struct {
  35. m map[uint64]Poolable
  36. mu sync.Mutex // guards access to this struct
  37. wg sync.WaitGroup
  38. }
  39. // maps the callback on all lentClients
  40. func (c *lentClients) mapAll(callback func(p Poolable)) {
  41. defer c.mu.Unlock()
  42. c.mu.Lock()
  43. for _, item := range c.m {
  44. callback(item)
  45. }
  46. }
  47. // operation performs an operation on a Poolable item using the callback
  48. func (c *lentClients) operation(callback func(p Poolable), item Poolable) {
  49. defer c.mu.Unlock()
  50. c.mu.Lock()
  51. callback(item)
  52. }
  53. // NewPool creates a new pool of Clients.
  54. func NewPool(poolSize int) *Pool {
  55. return &Pool{
  56. pool: make(chan Poolable, poolSize),
  57. sem: make(chan bool, poolSize),
  58. activeClients: lentClients{m: make(map[uint64]Poolable, poolSize)},
  59. ShutdownChan: make(chan int, 1),
  60. }
  61. }
  62. func (p *Pool) Start() {
  63. p.isShuttingDownFlg.Store(true)
  64. }
  65. // Lock the pool from borrowing then remove all active clients
  66. // each active client's timeout is lowered to 1 sec and notified
  67. // to stop accepting commands
  68. func (p *Pool) ShutdownState() {
  69. const aVeryLowTimeout = 1
  70. p.poolGuard.Lock() // ensure no other thread is in the borrowing now
  71. defer p.poolGuard.Unlock()
  72. p.isShuttingDownFlg.Store(true) // no more borrowing
  73. p.ShutdownChan <- 1 // release any waiting p.sem
  74. // set a low timeout
  75. p.activeClients.mapAll(func(p Poolable) {
  76. p.setTimeout(time.Duration(int64(aVeryLowTimeout)))
  77. })
  78. }
  79. func (p *Pool) ShutdownWait() {
  80. p.poolGuard.Lock() // ensure no other thread is in the borrowing now
  81. defer p.poolGuard.Unlock()
  82. p.activeClients.wg.Wait() // wait for clients to finish
  83. if len(p.ShutdownChan) > 0 {
  84. // drain
  85. <-p.ShutdownChan
  86. }
  87. p.isShuttingDownFlg.Store(false)
  88. }
  89. // returns true if the pool is shutting down
  90. func (p *Pool) IsShuttingDown() bool {
  91. if value, ok := p.isShuttingDownFlg.Load().(bool); ok {
  92. return value
  93. }
  94. return false
  95. }
  96. // set a timeout for all lent clients
  97. func (p *Pool) SetTimeout(duration time.Duration) {
  98. p.activeClients.mapAll(func(p Poolable) {
  99. p.setTimeout(duration)
  100. })
  101. }
  102. // Gets the number of active clients that are currently
  103. // out of the pool and busy serving
  104. func (p *Pool) GetActiveClientsCount() int {
  105. return len(p.sem)
  106. }
  107. // Borrow a Client from the pool. Will block if len(activeClients) > maxClients
  108. func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger) (Poolable, error) {
  109. p.poolGuard.Lock()
  110. defer p.poolGuard.Unlock()
  111. var c Poolable
  112. if yes, really := p.isShuttingDownFlg.Load().(bool); yes && really {
  113. // pool is shutting down.
  114. return c, ErrPoolShuttingDown
  115. }
  116. select {
  117. case p.sem <- true: // block the client from serving until there is room
  118. select {
  119. case c = <-p.pool:
  120. c.init(conn, clientID)
  121. default:
  122. c = NewClient(conn, clientID, logger)
  123. }
  124. p.activeClientsAdd(c)
  125. case <-p.ShutdownChan: // unblock p.sem when shutting down
  126. // pool is shutting down.
  127. return c, ErrPoolShuttingDown
  128. }
  129. return c, nil
  130. }
  131. // Return returns a Client back to the pool.
  132. func (p *Pool) Return(c Poolable) {
  133. select {
  134. case p.pool <- c:
  135. default:
  136. // hasta la vista, baby...
  137. }
  138. p.activeClientsRemove(c)
  139. <-p.sem // make room for the next serving client
  140. }
  141. func (p *Pool) activeClientsAdd(c Poolable) {
  142. p.activeClients.operation(func(item Poolable) {
  143. p.activeClients.wg.Add(1)
  144. p.activeClients.m[c.getID()] = item
  145. }, c)
  146. }
  147. func (p *Pool) activeClientsRemove(c Poolable) {
  148. p.activeClients.operation(func(item Poolable) {
  149. delete(p.activeClients.m, item.getID())
  150. p.activeClients.wg.Done()
  151. }, c)
  152. }