|
|
@@ -164,11 +164,20 @@ func (l *Logger) Log(t EventType, data interface{}) {
|
|
|
func (l *Logger) Subscribe(mask EventType) *Subscription {
|
|
|
l.mutex.Lock()
|
|
|
dl.Debugln("subscribe", mask)
|
|
|
+
|
|
|
s := &Subscription{
|
|
|
mask: mask,
|
|
|
events: make(chan Event, BufferSize),
|
|
|
timeout: time.NewTimer(0),
|
|
|
}
|
|
|
+
|
|
|
+ // We need to create the timeout timer in the stopped, non-fired state so
|
|
|
+ // that Subscription.Poll() can safely reset it and select on the timeout
|
|
|
+ // channel. This ensures the timer is stopped and the channel drained.
|
|
|
+ if !s.timeout.Stop() {
|
|
|
+ <-s.timeout.C
|
|
|
+ }
|
|
|
+
|
|
|
l.subs = append(l.subs, s)
|
|
|
l.mutex.Unlock()
|
|
|
return s
|
|
|
@@ -196,19 +205,18 @@ func (l *Logger) Unsubscribe(s *Subscription) {
|
|
|
func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
|
|
|
dl.Debugln("poll", timeout)
|
|
|
|
|
|
- if !s.timeout.Reset(timeout) {
|
|
|
- select {
|
|
|
- case <-s.timeout.C:
|
|
|
- default:
|
|
|
- }
|
|
|
- }
|
|
|
+ s.timeout.Reset(timeout)
|
|
|
|
|
|
select {
|
|
|
case e, ok := <-s.events:
|
|
|
if !ok {
|
|
|
return e, ErrClosed
|
|
|
}
|
|
|
- s.timeout.Stop()
|
|
|
+ if !s.timeout.Stop() {
|
|
|
+ // The timeout must be stopped and possibly drained to be ready
|
|
|
+ // for reuse in the next call.
|
|
|
+ <-s.timeout.C
|
|
|
+ }
|
|
|
return e, nil
|
|
|
case <-s.timeout.C:
|
|
|
return Event{}, ErrTimeout
|