Browse Source

Mechanism to stop external announcement routine

Jakob Borg 11 years ago
parent
commit
798c4aef9a
1 changed files with 47 additions and 11 deletions
  1. 47 11
      discover/discover.go

+ 47 - 11
discover/discover.go

@@ -24,12 +24,15 @@ type Discoverer struct {
 	listenAddrs      []string
 	localBcastIntv   time.Duration
 	globalBcastIntv  time.Duration
+	errorRetryIntv   time.Duration
 	beacon           *beacon.Beacon
 	registry         map[protocol.NodeID][]string
 	registryLock     sync.RWMutex
 	extServer        string
 	extPort          uint16
 	localBcastTick   <-chan time.Time
+	stopGlobal       chan struct{}
+	globalWG         sync.WaitGroup
 	forcedBcastTick  chan time.Time
 	extAnnounceOK    bool
 	extAnnounceOKmut sync.Mutex
@@ -54,6 +57,7 @@ func NewDiscoverer(id protocol.NodeID, addresses []string, localPort int) (*Disc
 		listenAddrs:     addresses,
 		localBcastIntv:  30 * time.Second,
 		globalBcastIntv: 1800 * time.Second,
+		errorRetryIntv:  60 * time.Second,
 		beacon:          b,
 		registry:        make(map[protocol.NodeID][]string),
 	}
@@ -70,11 +74,20 @@ func (d *Discoverer) StartLocal() {
 }
 
 func (d *Discoverer) StartGlobal(server string, extPort uint16) {
+	// Wait for any previous announcer to stop before starting a new one.
+	d.globalWG.Wait()
 	d.extServer = server
 	d.extPort = extPort
+	d.stopGlobal = make(chan struct{})
+	d.globalWG.Add(1)
 	go d.sendExternalAnnouncements()
 }
 
+func (d *Discoverer) StopGlobal() {
+	close(d.stopGlobal)
+	d.globalWG.Wait()
+}
+
 func (d *Discoverer) ExtAnnounceOK() bool {
 	d.extAnnounceOKmut.Lock()
 	defer d.extAnnounceOKmut.Unlock()
@@ -173,20 +186,19 @@ func (d *Discoverer) sendLocalAnnouncements() {
 }
 
 func (d *Discoverer) sendExternalAnnouncements() {
-	// this should go in the Discoverer struct
-	errorRetryIntv := 60 * time.Second
+	defer d.globalWG.Done()
 
 	remote, err := net.ResolveUDPAddr("udp", d.extServer)
 	for err != nil {
-		l.Warnf("Global discovery: %v; trying again in %v", err, errorRetryIntv)
-		time.Sleep(errorRetryIntv)
+		l.Warnf("Global discovery: %v; trying again in %v", err, d.errorRetryIntv)
+		time.Sleep(d.errorRetryIntv)
 		remote, err = net.ResolveUDPAddr("udp", d.extServer)
 	}
 
 	conn, err := net.ListenUDP("udp", nil)
 	for err != nil {
-		l.Warnf("Global discovery: %v; trying again in %v", err, errorRetryIntv)
-		time.Sleep(errorRetryIntv)
+		l.Warnf("Global discovery: %v; trying again in %v", err, d.errorRetryIntv)
+		time.Sleep(d.errorRetryIntv)
 		conn, err = net.ListenUDP("udp", nil)
 	}
 
@@ -201,7 +213,10 @@ func (d *Discoverer) sendExternalAnnouncements() {
 		buf = d.announcementPkt()
 	}
 
-	for {
+	var bcastTick = time.Tick(d.globalBcastIntv)
+	var errTick <-chan time.Time
+
+	sendOneAnnouncement := func() {
 		var ok bool
 
 		if debug {
@@ -230,11 +245,32 @@ func (d *Discoverer) sendExternalAnnouncements() {
 		d.extAnnounceOKmut.Unlock()
 
 		if ok {
-			time.Sleep(d.globalBcastIntv)
-		} else {
-			time.Sleep(errorRetryIntv)
+			errTick = nil
+		} else if errTick != nil {
+			errTick = time.Tick(d.errorRetryIntv)
+		}
+	}
+
+	// Announce once, immediately
+	sendOneAnnouncement()
+
+loop:
+	for {
+		select {
+		case <-d.stopGlobal:
+			break loop
+
+		case <-errTick:
+			sendOneAnnouncement()
+
+		case <-bcastTick:
+			sendOneAnnouncement()
 		}
 	}
+
+	if debug {
+		l.Debugln("discover: stopping global")
+	}
 }
 
 func (d *Discoverer) recvAnnouncements() {
@@ -295,7 +331,7 @@ func (d *Discoverer) registerNode(addr net.Addr, node Node) bool {
 		}
 	}
 	if debug {
-		l.Debugf("discover: register: %s -> %#v", node.ID, addrs)
+		l.Debugf("discover: register: %v -> %#v", node.ID, addrs)
 	}
 	var id protocol.NodeID
 	copy(id[:], node.ID)