|
|
@@ -31,25 +31,21 @@ import (
|
|
|
)
|
|
|
|
|
|
type Discoverer struct {
|
|
|
- myID protocol.DeviceID
|
|
|
- listenAddrs []string
|
|
|
- localBcastIntv time.Duration
|
|
|
- localBcastStart time.Time
|
|
|
- globalBcastIntv time.Duration
|
|
|
- errorRetryIntv time.Duration
|
|
|
- cacheLifetime time.Duration
|
|
|
- broadcastBeacon beacon.Interface
|
|
|
- multicastBeacon beacon.Interface
|
|
|
- registry map[protocol.DeviceID][]CacheEntry
|
|
|
- registryLock sync.RWMutex
|
|
|
- extServers []string
|
|
|
- extPort uint16
|
|
|
- localBcastTick <-chan time.Time
|
|
|
- stopGlobal chan struct{}
|
|
|
- globalWG sync.WaitGroup
|
|
|
- forcedBcastTick chan time.Time
|
|
|
- extAnnounceOK map[string]bool
|
|
|
- extAnnounceOKmut sync.Mutex
|
|
|
+ myID protocol.DeviceID
|
|
|
+ listenAddrs []string
|
|
|
+ localBcastIntv time.Duration
|
|
|
+ localBcastStart time.Time
|
|
|
+ cacheLifetime time.Duration
|
|
|
+ broadcastBeacon beacon.Interface
|
|
|
+ multicastBeacon beacon.Interface
|
|
|
+ registry map[protocol.DeviceID][]CacheEntry
|
|
|
+ registryLock sync.RWMutex
|
|
|
+ extPort uint16
|
|
|
+ localBcastTick <-chan time.Time
|
|
|
+ forcedBcastTick chan time.Time
|
|
|
+
|
|
|
+ clients []Client
|
|
|
+ mut sync.RWMutex
|
|
|
}
|
|
|
|
|
|
type CacheEntry struct {
|
|
|
@@ -63,14 +59,11 @@ var (
|
|
|
|
|
|
func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer {
|
|
|
return &Discoverer{
|
|
|
- myID: id,
|
|
|
- listenAddrs: addresses,
|
|
|
- localBcastIntv: 30 * time.Second,
|
|
|
- globalBcastIntv: 1800 * time.Second,
|
|
|
- errorRetryIntv: 60 * time.Second,
|
|
|
- cacheLifetime: 5 * time.Minute,
|
|
|
- registry: make(map[protocol.DeviceID][]CacheEntry),
|
|
|
- extAnnounceOK: make(map[string]bool),
|
|
|
+ myID: id,
|
|
|
+ listenAddrs: addresses,
|
|
|
+ localBcastIntv: 30 * time.Second,
|
|
|
+ cacheLifetime: 5 * time.Minute,
|
|
|
+ registry: make(map[protocol.DeviceID][]CacheEntry),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -112,38 +105,60 @@ func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
|
|
|
}
|
|
|
|
|
|
func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
|
|
|
- // Wait for any previous announcer to stop before starting a new one.
|
|
|
- d.globalWG.Wait()
|
|
|
- d.extServers = servers
|
|
|
+ d.mut.Lock()
|
|
|
+ defer d.mut.Unlock()
|
|
|
+
|
|
|
+ if len(d.clients) > 0 {
|
|
|
+ d.stopGlobal()
|
|
|
+ }
|
|
|
+
|
|
|
d.extPort = extPort
|
|
|
- d.stopGlobal = make(chan struct{})
|
|
|
- d.globalWG.Add(1)
|
|
|
- go func() {
|
|
|
- defer d.globalWG.Done()
|
|
|
-
|
|
|
- buf := d.announcementPkt()
|
|
|
-
|
|
|
- for _, extServer := range d.extServers {
|
|
|
- d.globalWG.Add(1)
|
|
|
- go func(server string) {
|
|
|
- d.sendExternalAnnouncements(server, buf)
|
|
|
- d.globalWG.Done()
|
|
|
- }(extServer)
|
|
|
- }
|
|
|
- }()
|
|
|
+ pkt := d.announcementPkt()
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ clients := make(chan Client, len(servers))
|
|
|
+ for _, address := range servers {
|
|
|
+ wg.Add(1)
|
|
|
+ go func(addr string) {
|
|
|
+ defer wg.Done()
|
|
|
+ client, err := New(addr, pkt)
|
|
|
+ if err != nil {
|
|
|
+ l.Infoln("Error creating discovery client", addr, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ clients <- client
|
|
|
+ }(address)
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+ close(clients)
|
|
|
+
|
|
|
+ for client := range clients {
|
|
|
+ d.clients = append(d.clients, client)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (d *Discoverer) StopGlobal() {
|
|
|
- if d.stopGlobal != nil {
|
|
|
- close(d.stopGlobal)
|
|
|
- d.globalWG.Wait()
|
|
|
+ d.mut.Lock()
|
|
|
+ defer d.mut.Unlock()
|
|
|
+ d.stopGlobal()
|
|
|
+}
|
|
|
+
|
|
|
+func (d *Discoverer) stopGlobal() {
|
|
|
+ for _, client := range d.clients {
|
|
|
+ client.Stop()
|
|
|
}
|
|
|
+ d.clients = []Client{}
|
|
|
}
|
|
|
|
|
|
func (d *Discoverer) ExtAnnounceOK() map[string]bool {
|
|
|
- d.extAnnounceOKmut.Lock()
|
|
|
- defer d.extAnnounceOKmut.Unlock()
|
|
|
- return d.extAnnounceOK
|
|
|
+ d.mut.RLock()
|
|
|
+ defer d.mut.RUnlock()
|
|
|
+
|
|
|
+ ret := make(map[string]bool)
|
|
|
+ for _, client := range d.clients {
|
|
|
+ ret[client.Address()] = client.StatusOK()
|
|
|
+ }
|
|
|
+ return ret
|
|
|
}
|
|
|
|
|
|
func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
|
|
|
@@ -151,22 +166,47 @@ func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
|
|
|
cached := d.filterCached(d.registry[device])
|
|
|
d.registryLock.RUnlock()
|
|
|
|
|
|
+ d.mut.RLock()
|
|
|
+ defer d.mut.RUnlock()
|
|
|
+
|
|
|
+ var addrs []string
|
|
|
if len(cached) > 0 {
|
|
|
- addrs := make([]string, len(cached))
|
|
|
+ addrs = make([]string, len(cached))
|
|
|
for i := range cached {
|
|
|
addrs[i] = cached[i].Address
|
|
|
}
|
|
|
- return addrs
|
|
|
- } else if len(d.extServers) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
|
|
|
+ } else if len(d.clients) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
|
|
|
// Only perform external lookups if we have at least one external
|
|
|
- // server and one local announcement interval has passed. This is to
|
|
|
- // avoid finding local peers on their remote address at startup.
|
|
|
- addrs := d.externalLookup(device)
|
|
|
- cached = make([]CacheEntry, len(addrs))
|
|
|
- for i := range addrs {
|
|
|
- cached[i] = CacheEntry{
|
|
|
- Address: addrs[i],
|
|
|
- Seen: time.Now(),
|
|
|
+ // server client and one local announcement interval has passed. This is
|
|
|
+ // to avoid finding local peers on their remote address at startup.
|
|
|
+ results := make(chan []string, len(d.clients))
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ for _, client := range d.clients {
|
|
|
+ wg.Add(1)
|
|
|
+ go func(c Client) {
|
|
|
+ defer wg.Done()
|
|
|
+ results <- c.Lookup(device)
|
|
|
+ }(client)
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+ close(results)
|
|
|
+
|
|
|
+ cached := []CacheEntry{}
|
|
|
+ seen := make(map[string]struct{})
|
|
|
+ now := time.Now()
|
|
|
+
|
|
|
+ for result := range results {
|
|
|
+ for _, addr := range result {
|
|
|
+ _, ok := seen[addr]
|
|
|
+ if !ok {
|
|
|
+ cached = append(cached, CacheEntry{
|
|
|
+ Address: addr,
|
|
|
+ Seen: now,
|
|
|
+ })
|
|
|
+ seen[addr] = struct{}{}
|
|
|
+ addrs = append(addrs, addr)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -174,7 +214,7 @@ func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
|
|
|
d.registry[device] = cached
|
|
|
d.registryLock.Unlock()
|
|
|
}
|
|
|
- return nil
|
|
|
+ return addrs
|
|
|
}
|
|
|
|
|
|
func (d *Discoverer) Hint(device string, addrs []string) {
|
|
|
@@ -199,7 +239,7 @@ func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
|
|
|
return devices
|
|
|
}
|
|
|
|
|
|
-func (d *Discoverer) announcementPkt() []byte {
|
|
|
+func (d *Discoverer) announcementPkt() *Announce {
|
|
|
var addrs []Address
|
|
|
if d.extPort != 0 {
|
|
|
addrs = []Address{{Port: d.extPort}}
|
|
|
@@ -221,11 +261,10 @@ func (d *Discoverer) announcementPkt() []byte {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- var pkt = Announce{
|
|
|
+ return &Announce{
|
|
|
Magic: AnnouncementMagic,
|
|
|
This: Device{d.myID[:], addrs},
|
|
|
}
|
|
|
- return pkt.MustMarshalXDR()
|
|
|
}
|
|
|
|
|
|
func (d *Discoverer) sendLocalAnnouncements() {
|
|
|
@@ -252,80 +291,6 @@ func (d *Discoverer) sendLocalAnnouncements() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (d *Discoverer) sendExternalAnnouncements(extServer string, buf []byte) {
|
|
|
- timer := time.NewTimer(0)
|
|
|
-
|
|
|
- conn, err := net.ListenUDP("udp", nil)
|
|
|
- for err != nil {
|
|
|
- timer.Reset(d.errorRetryIntv)
|
|
|
- l.Warnf("Global discovery: %v; trying again in %v", err, d.errorRetryIntv)
|
|
|
- select {
|
|
|
- case <-d.stopGlobal:
|
|
|
- return
|
|
|
- case <-timer.C:
|
|
|
- }
|
|
|
- conn, err = net.ListenUDP("udp", nil)
|
|
|
- }
|
|
|
-
|
|
|
- remote, err := net.ResolveUDPAddr("udp", extServer)
|
|
|
- for err != nil {
|
|
|
- timer.Reset(d.errorRetryIntv)
|
|
|
- l.Warnf("Global discovery: %s: %v; trying again in %v", extServer, err, d.errorRetryIntv)
|
|
|
- select {
|
|
|
- case <-d.stopGlobal:
|
|
|
- return
|
|
|
- case <-timer.C:
|
|
|
- }
|
|
|
- remote, err = net.ResolveUDPAddr("udp", extServer)
|
|
|
- }
|
|
|
-
|
|
|
- // Delay the first announcement until after a full local announcement
|
|
|
- // cycle, to increase the chance of other peers finding us locally first.
|
|
|
- timer.Reset(d.localBcastIntv)
|
|
|
-
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-d.stopGlobal:
|
|
|
- return
|
|
|
-
|
|
|
- case <-timer.C:
|
|
|
- var ok bool
|
|
|
-
|
|
|
- if debug {
|
|
|
- l.Debugf("discover: send announcement -> %v\n%s", remote, hex.Dump(buf))
|
|
|
- }
|
|
|
-
|
|
|
- _, err := conn.WriteTo(buf, remote)
|
|
|
- if err != nil {
|
|
|
- if debug {
|
|
|
- l.Debugln("discover: %s: warning:", extServer, err)
|
|
|
- }
|
|
|
- ok = false
|
|
|
- } else {
|
|
|
- // Verify that the announce server responds positively for our device ID
|
|
|
-
|
|
|
- time.Sleep(1 * time.Second)
|
|
|
- res := d.externalLookupOnServer(extServer, d.myID)
|
|
|
-
|
|
|
- if debug {
|
|
|
- l.Debugln("discover:", extServer, "external lookup check:", res)
|
|
|
- }
|
|
|
- ok = len(res) > 0
|
|
|
- }
|
|
|
-
|
|
|
- d.extAnnounceOKmut.Lock()
|
|
|
- d.extAnnounceOK[extServer] = ok
|
|
|
- d.extAnnounceOKmut.Unlock()
|
|
|
-
|
|
|
- if ok {
|
|
|
- timer.Reset(d.globalBcastIntv)
|
|
|
- } else {
|
|
|
- timer.Reset(d.errorRetryIntv)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
|
|
|
for {
|
|
|
buf, addr := b.Recv()
|
|
|
@@ -406,104 +371,6 @@ func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool {
|
|
|
return len(current) > len(orig)
|
|
|
}
|
|
|
|
|
|
-func (d *Discoverer) externalLookup(device protocol.DeviceID) []string {
|
|
|
- // Buffer up to as many answers as we have servers to query.
|
|
|
- results := make(chan []string, len(d.extServers))
|
|
|
-
|
|
|
- // Query all servers.
|
|
|
- wg := sync.WaitGroup{}
|
|
|
- for _, extServer := range d.extServers {
|
|
|
- wg.Add(1)
|
|
|
- go func(server string) {
|
|
|
- result := d.externalLookupOnServer(server, device)
|
|
|
- if debug {
|
|
|
- l.Debugln("discover:", result, "from", server, "for", device)
|
|
|
- }
|
|
|
- results <- result
|
|
|
- wg.Done()
|
|
|
- }(extServer)
|
|
|
- }
|
|
|
-
|
|
|
- wg.Wait()
|
|
|
- close(results)
|
|
|
-
|
|
|
- addrs := []string{}
|
|
|
- for result := range results {
|
|
|
- addrs = append(addrs, result...)
|
|
|
- }
|
|
|
-
|
|
|
- return addrs
|
|
|
-}
|
|
|
-
|
|
|
-func (d *Discoverer) externalLookupOnServer(extServer string, device protocol.DeviceID) []string {
|
|
|
- extIP, err := net.ResolveUDPAddr("udp", extServer)
|
|
|
- if err != nil {
|
|
|
- if debug {
|
|
|
- l.Debugf("discover: %s: %v; no external lookup", extServer, err)
|
|
|
- }
|
|
|
- return nil
|
|
|
- }
|
|
|
-
|
|
|
- conn, err := net.DialUDP("udp", nil, extIP)
|
|
|
- if err != nil {
|
|
|
- if debug {
|
|
|
- l.Debugf("discover: %s: %v; no external lookup", extServer, err)
|
|
|
- }
|
|
|
- return nil
|
|
|
- }
|
|
|
- defer conn.Close()
|
|
|
-
|
|
|
- err = conn.SetDeadline(time.Now().Add(5 * time.Second))
|
|
|
- if err != nil {
|
|
|
- if debug {
|
|
|
- l.Debugf("discover: %s: %v; no external lookup", extServer, err)
|
|
|
- }
|
|
|
- return nil
|
|
|
- }
|
|
|
-
|
|
|
- buf := Query{QueryMagic, device[:]}.MustMarshalXDR()
|
|
|
- _, err = conn.Write(buf)
|
|
|
- if err != nil {
|
|
|
- if debug {
|
|
|
- l.Debugf("discover: %s: %v; no external lookup", extServer, err)
|
|
|
- }
|
|
|
- return nil
|
|
|
- }
|
|
|
-
|
|
|
- buf = make([]byte, 2048)
|
|
|
- n, err := conn.Read(buf)
|
|
|
- if err != nil {
|
|
|
- if err, ok := err.(net.Error); ok && err.Timeout() {
|
|
|
- // Expected if the server doesn't know about requested device ID
|
|
|
- return nil
|
|
|
- }
|
|
|
- if debug {
|
|
|
- l.Debugf("discover: %s: %v; no external lookup", extServer, err)
|
|
|
- }
|
|
|
- return nil
|
|
|
- }
|
|
|
-
|
|
|
- if debug {
|
|
|
- l.Debugf("discover: %s: read external:\n%s", extServer, hex.Dump(buf[:n]))
|
|
|
- }
|
|
|
-
|
|
|
- var pkt Announce
|
|
|
- err = pkt.UnmarshalXDR(buf[:n])
|
|
|
- if err != nil && err != io.EOF {
|
|
|
- if debug {
|
|
|
- l.Debugln("discover:", extServer, err)
|
|
|
- }
|
|
|
- return nil
|
|
|
- }
|
|
|
-
|
|
|
- var addrs []string
|
|
|
- for _, a := range pkt.This.Addresses {
|
|
|
- deviceAddr := net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
|
|
|
- addrs = append(addrs, deviceAddr)
|
|
|
- }
|
|
|
- return addrs
|
|
|
-}
|
|
|
-
|
|
|
func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry {
|
|
|
for i := 0; i < len(c); {
|
|
|
if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {
|