123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package gopool
- import (
- "sync"
- "errors"
- "fmt"
- )
- var (
- ErrHandlerIsExist = errors.New("指定的键已存在")
- ErrWorkerChanClosed = errors.New("队列已关闭")
- )
- type ChannelHandler func()
- type entry struct {
- handler ChannelHandler
- key string
- }
- type ChannelPool struct {
- maxWorkerNum int
- maxPoolNum int
- wait *sync.WaitGroup
- cache *sync.Map
- worker chan *entry
- limit chan bool
- isClosed bool
- once *sync.Once
- }
- func NewChannelPool(maxWorkerNum, maxPoolNum int) (*ChannelPool) {
- if maxWorkerNum <= 0 {
- maxWorkerNum = 1
- }
- if maxPoolNum <= 0 {
- maxWorkerNum = 100
- }
- return &ChannelPool{
- maxWorkerNum: maxWorkerNum,
- maxPoolNum: maxPoolNum,
- wait: &sync.WaitGroup{},
- cache: &sync.Map{},
- worker: make(chan *entry, maxWorkerNum),
- limit: make(chan bool, maxWorkerNum),
- isClosed: false,
- once: &sync.Once{},
- }
- }
- func (pool *ChannelPool) LoadOrStore(key string,value ChannelHandler) error {
- if pool.isClosed {
- return ErrWorkerChanClosed
- }
- if _,loaded := pool.cache.LoadOrStore(key,false); loaded {
- return ErrHandlerIsExist
- }else{
- pool.worker <- &entry{handler:value,key:key}
- return nil
- }
- }
- func (pool *ChannelPool) Start() {
- pool.once.Do(func() {
- go func() {
- for i :=0; i < pool.maxWorkerNum; i ++ {
- pool.limit <- true
- }
- for {
- actual, isClosed := <-pool.worker
- //当队列被关闭,则跳出循环
- if actual == nil && !isClosed {
- fmt.Println("工作队列已关闭")
- break
- }
- limit := <-pool.limit
- if limit {
- pool.wait.Add(1)
- go func(actual *entry) {
- defer func() {
- pool.cache.Delete(actual.key)
- pool.limit <- true
- pool.wait.Done()
- }()
- actual.handler()
- }(actual)
- }
- }
- }()
- })
- }
- func (pool *ChannelPool) Wait() {
- close(pool.worker)
- pool.wait.Wait()
- }
|