gopool.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package gopool
  2. import (
  3. "sync"
  4. "errors"
  5. "fmt"
  6. )
  7. var (
  8. ErrHandlerIsExist = errors.New("指定的键已存在")
  9. ErrWorkerChanClosed = errors.New("队列已关闭")
  10. )
  11. type ChannelHandler func()
  12. type entry struct {
  13. handler ChannelHandler
  14. key string
  15. }
  16. type ChannelPool struct {
  17. maxWorkerNum int
  18. maxPoolNum int
  19. wait *sync.WaitGroup
  20. cache *sync.Map
  21. worker chan *entry
  22. limit chan bool
  23. isClosed bool
  24. once *sync.Once
  25. }
  26. func NewChannelPool(maxWorkerNum, maxPoolNum int) (*ChannelPool) {
  27. if maxWorkerNum <= 0 {
  28. maxWorkerNum = 1
  29. }
  30. if maxPoolNum <= 0 {
  31. maxWorkerNum = 100
  32. }
  33. return &ChannelPool{
  34. maxWorkerNum: maxWorkerNum,
  35. maxPoolNum: maxPoolNum,
  36. wait: &sync.WaitGroup{},
  37. cache: &sync.Map{},
  38. worker: make(chan *entry, maxWorkerNum),
  39. limit: make(chan bool, maxWorkerNum),
  40. isClosed: false,
  41. once: &sync.Once{},
  42. }
  43. }
  44. func (pool *ChannelPool) LoadOrStore(key string,value ChannelHandler) error {
  45. if pool.isClosed {
  46. return ErrWorkerChanClosed
  47. }
  48. if _,loaded := pool.cache.LoadOrStore(key,false); loaded {
  49. return ErrHandlerIsExist
  50. }else{
  51. pool.worker <- &entry{handler:value,key:key}
  52. return nil
  53. }
  54. }
  55. func (pool *ChannelPool) Start() {
  56. pool.once.Do(func() {
  57. go func() {
  58. for i :=0; i < pool.maxWorkerNum; i ++ {
  59. pool.limit <- true
  60. }
  61. for {
  62. actual, isClosed := <-pool.worker
  63. //当队列被关闭,则跳出循环
  64. if actual == nil && !isClosed {
  65. fmt.Println("工作队列已关闭")
  66. break
  67. }
  68. limit := <-pool.limit
  69. if limit {
  70. pool.wait.Add(1)
  71. go func(actual *entry) {
  72. defer func() {
  73. pool.cache.Delete(actual.key)
  74. pool.limit <- true
  75. pool.wait.Done()
  76. }()
  77. actual.handler()
  78. }(actual)
  79. }
  80. }
  81. }()
  82. })
  83. }
  84. func (pool *ChannelPool) Wait() {
  85. close(pool.worker)
  86. pool.wait.Wait()
  87. }