| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 | 
							- package nebula
 
- import (
 
- 	"sync"
 
- 	"time"
 
- )
 
- // How many timer objects should be cached
 
- const timerCacheMax = 50000
 
- type TimerWheel[T any] struct {
 
- 	// Current tick
 
- 	current int
 
- 	// Cheat on finding the length of the wheel
 
- 	wheelLen int
 
- 	// Last time we ticked, since we are lazy ticking
 
- 	lastTick *time.Time
 
- 	// Durations of a tick and the entire wheel
 
- 	tickDuration  time.Duration
 
- 	wheelDuration time.Duration
 
- 	// The actual wheel which is just a set of singly linked lists, head/tail pointers
 
- 	wheel []*TimeoutList[T]
 
- 	// Singly linked list of items that have timed out of the wheel
 
- 	expired *TimeoutList[T]
 
- 	// Item cache to avoid garbage collect
 
- 	itemCache   *TimeoutItem[T]
 
- 	itemsCached int
 
- }
 
- type LockingTimerWheel[T any] struct {
 
- 	m sync.Mutex
 
- 	t *TimerWheel[T]
 
- }
 
- // TimeoutList Represents a tick in the wheel
 
- type TimeoutList[T any] struct {
 
- 	Head *TimeoutItem[T]
 
- 	Tail *TimeoutItem[T]
 
- }
 
- // TimeoutItem Represents an item within a tick
 
- type TimeoutItem[T any] struct {
 
- 	Item T
 
- 	Next *TimeoutItem[T]
 
- }
 
- // NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
 
- // Purge must be called once per entry to actually remove anything
 
- // The TimerWheel does not handle concurrency on its own.
 
- // Locks around access to it must be used if multiple routines are manipulating it.
 
- func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
 
- 	//TODO provide an error
 
- 	//if min >= max {
 
- 	//	return nil
 
- 	//}
 
- 	// Round down and add 2 so we can have the smallest # of ticks in the wheel and still account for a full
 
- 	// max duration, even if our current tick is at the maximum position and the next item to be added is at maximum
 
- 	// timeout
 
- 	wLen := int((max / min) + 2)
 
- 	tw := TimerWheel[T]{
 
- 		wheelLen:      wLen,
 
- 		wheel:         make([]*TimeoutList[T], wLen),
 
- 		tickDuration:  min,
 
- 		wheelDuration: max,
 
- 		expired:       &TimeoutList[T]{},
 
- 	}
 
- 	for i := range tw.wheel {
 
- 		tw.wheel[i] = &TimeoutList[T]{}
 
- 	}
 
- 	return &tw
 
- }
 
- // NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
 
- func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] {
 
- 	return &LockingTimerWheel[T]{
 
- 		t: NewTimerWheel[T](min, max),
 
- 	}
 
- }
 
- // Add will add an item to the wheel in its proper timeout.
 
- // Caller should Advance the wheel prior to ensure the proper slot is used.
 
- func (tw *TimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
 
- 	i := tw.findWheel(timeout)
 
- 	// Try to fetch off the cache
 
- 	ti := tw.itemCache
 
- 	if ti != nil {
 
- 		tw.itemCache = ti.Next
 
- 		tw.itemsCached--
 
- 		ti.Next = nil
 
- 	} else {
 
- 		ti = &TimeoutItem[T]{}
 
- 	}
 
- 	// Relink and return
 
- 	ti.Item = v
 
- 	if tw.wheel[i].Tail == nil {
 
- 		tw.wheel[i].Head = ti
 
- 		tw.wheel[i].Tail = ti
 
- 	} else {
 
- 		tw.wheel[i].Tail.Next = ti
 
- 		tw.wheel[i].Tail = ti
 
- 	}
 
- 	return ti
 
- }
 
- // Purge removes and returns the first available expired item from the wheel and the 2nd argument is true.
 
- // If no item is available then an empty T is returned and the 2nd argument is false.
 
- func (tw *TimerWheel[T]) Purge() (T, bool) {
 
- 	if tw.expired.Head == nil {
 
- 		var na T
 
- 		return na, false
 
- 	}
 
- 	ti := tw.expired.Head
 
- 	tw.expired.Head = ti.Next
 
- 	if tw.expired.Head == nil {
 
- 		tw.expired.Tail = nil
 
- 	}
 
- 	// Clear out the items references
 
- 	ti.Next = nil
 
- 	// Maybe cache it for later
 
- 	if tw.itemsCached < timerCacheMax {
 
- 		ti.Next = tw.itemCache
 
- 		tw.itemCache = ti
 
- 		tw.itemsCached++
 
- 	}
 
- 	return ti.Item, true
 
- }
 
- // findWheel find the next position in the wheel for the provided timeout given the current tick
 
- func (tw *TimerWheel[T]) findWheel(timeout time.Duration) (i int) {
 
- 	if timeout < tw.tickDuration {
 
- 		// Can't track anything below the set resolution
 
- 		timeout = tw.tickDuration
 
- 	} else if timeout > tw.wheelDuration {
 
- 		// We aren't handling timeouts greater than the wheels duration
 
- 		timeout = tw.wheelDuration
 
- 	}
 
- 	// Find the next highest, rounding up
 
- 	tick := int(((timeout - 1) / tw.tickDuration) + 1)
 
- 	// Add another tick since the current tick may almost be over then map it to the wheel from our
 
- 	// current position
 
- 	tick += tw.current + 1
 
- 	if tick >= tw.wheelLen {
 
- 		tick -= tw.wheelLen
 
- 	}
 
- 	return tick
 
- }
 
- // Advance will move the wheel forward by the appropriate number of ticks for the provided time and all items
 
- // passed over will be moved to the expired list. Calling Purge is necessary to remove them entirely.
 
- func (tw *TimerWheel[T]) Advance(now time.Time) {
 
- 	if tw.lastTick == nil {
 
- 		tw.lastTick = &now
 
- 	}
 
- 	// We want to round down
 
- 	ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
 
- 	adv := ticks
 
- 	if ticks > tw.wheelLen {
 
- 		ticks = tw.wheelLen
 
- 	}
 
- 	for i := 0; i < ticks; i++ {
 
- 		tw.current++
 
- 		if tw.current >= tw.wheelLen {
 
- 			tw.current = 0
 
- 		}
 
- 		if tw.wheel[tw.current].Head != nil {
 
- 			// We need to append the expired items as to not starve evicting the oldest ones
 
- 			if tw.expired.Tail == nil {
 
- 				tw.expired.Head = tw.wheel[tw.current].Head
 
- 				tw.expired.Tail = tw.wheel[tw.current].Tail
 
- 			} else {
 
- 				tw.expired.Tail.Next = tw.wheel[tw.current].Head
 
- 				tw.expired.Tail = tw.wheel[tw.current].Tail
 
- 			}
 
- 			tw.wheel[tw.current].Head = nil
 
- 			tw.wheel[tw.current].Tail = nil
 
- 		}
 
- 	}
 
- 	// Advance the tick based on duration to avoid losing some accuracy
 
- 	newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
 
- 	tw.lastTick = &newTick
 
- }
 
- func (lw *LockingTimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
 
- 	lw.m.Lock()
 
- 	defer lw.m.Unlock()
 
- 	return lw.t.Add(v, timeout)
 
- }
 
- func (lw *LockingTimerWheel[T]) Purge() (T, bool) {
 
- 	lw.m.Lock()
 
- 	defer lw.m.Unlock()
 
- 	return lw.t.Purge()
 
- }
 
- func (lw *LockingTimerWheel[T]) Advance(now time.Time) {
 
- 	lw.m.Lock()
 
- 	defer lw.m.Unlock()
 
- 	lw.t.Advance(now)
 
- }
 
 
  |