discover.go 9.4 KB


  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This program is free software: you can redistribute it and/or modify it
  4. // under the terms of the GNU General Public License as published by the Free
  5. // Software Foundation, either version 3 of the License, or (at your option)
  6. // any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful, but WITHOUT
  9. // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10. // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  11. // more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program. If not, see <http://www.gnu.org/licenses/>.
  15. package discover
  16. import (
  17. "bytes"
  18. "encoding/hex"
  19. "errors"
  20. "io"
  21. "net"
  22. "strconv"
  23. "sync"
  24. "time"
  25. "github.com/syncthing/syncthing/internal/beacon"
  26. "github.com/syncthing/syncthing/internal/events"
  27. "github.com/syncthing/syncthing/internal/protocol"
  28. )
  29. type Discoverer struct {
  30. myID protocol.DeviceID
  31. listenAddrs []string
  32. localBcastIntv time.Duration
  33. localBcastStart time.Time
  34. cacheLifetime time.Duration
  35. broadcastBeacon beacon.Interface
  36. multicastBeacon beacon.Interface
  37. registry map[protocol.DeviceID][]CacheEntry
  38. registryLock sync.RWMutex
  39. extPort uint16
  40. localBcastTick <-chan time.Time
  41. forcedBcastTick chan time.Time
  42. clients []Client
  43. mut sync.RWMutex
  44. }
  45. type CacheEntry struct {
  46. Address string
  47. Seen time.Time
  48. }
  49. var (
  50. ErrIncorrectMagic = errors.New("incorrect magic number")
  51. )
  52. func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer {
  53. return &Discoverer{
  54. myID: id,
  55. listenAddrs: addresses,
  56. localBcastIntv: 30 * time.Second,
  57. cacheLifetime: 5 * time.Minute,
  58. registry: make(map[protocol.DeviceID][]CacheEntry),
  59. }
  60. }
  61. func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
  62. if localPort > 0 {
  63. bb, err := beacon.NewBroadcast(localPort)
  64. if err != nil {
  65. if debug {
  66. l.Debugln(err)
  67. }
  68. l.Infoln("Local discovery over IPv4 unavailable")
  69. } else {
  70. d.broadcastBeacon = bb
  71. go d.recvAnnouncements(bb)
  72. }
  73. }
  74. if len(localMCAddr) > 0 {
  75. mb, err := beacon.NewMulticast(localMCAddr)
  76. if err != nil {
  77. if debug {
  78. l.Debugln(err)
  79. }
  80. l.Infoln("Local discovery over IPv6 unavailable")
  81. } else {
  82. d.multicastBeacon = mb
  83. go d.recvAnnouncements(mb)
  84. }
  85. }
  86. if d.broadcastBeacon == nil && d.multicastBeacon == nil {
  87. l.Warnln("Local discovery unavailable")
  88. } else {
  89. d.localBcastTick = time.Tick(d.localBcastIntv)
  90. d.forcedBcastTick = make(chan time.Time)
  91. d.localBcastStart = time.Now()
  92. go d.sendLocalAnnouncements()
  93. }
  94. }
  95. func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
  96. d.mut.Lock()
  97. defer d.mut.Unlock()
  98. if len(d.clients) > 0 {
  99. d.stopGlobal()
  100. }
  101. d.extPort = extPort
  102. pkt := d.announcementPkt()
  103. wg := sync.WaitGroup{}
  104. clients := make(chan Client, len(servers))
  105. for _, address := range servers {
  106. wg.Add(1)
  107. go func(addr string) {
  108. defer wg.Done()
  109. client, err := New(addr, pkt)
  110. if err != nil {
  111. l.Infoln("Error creating discovery client", addr, err)
  112. return
  113. }
  114. clients <- client
  115. }(address)
  116. }
  117. wg.Wait()
  118. close(clients)
  119. for client := range clients {
  120. d.clients = append(d.clients, client)
  121. }
  122. }
  123. func (d *Discoverer) StopGlobal() {
  124. d.mut.Lock()
  125. defer d.mut.Unlock()
  126. d.stopGlobal()
  127. }
  128. func (d *Discoverer) stopGlobal() {
  129. for _, client := range d.clients {
  130. client.Stop()
  131. }
  132. d.clients = []Client{}
  133. }
  134. func (d *Discoverer) ExtAnnounceOK() map[string]bool {
  135. d.mut.RLock()
  136. defer d.mut.RUnlock()
  137. ret := make(map[string]bool)
  138. for _, client := range d.clients {
  139. ret[client.Address()] = client.StatusOK()
  140. }
  141. return ret
  142. }
  143. func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
  144. d.registryLock.RLock()
  145. cached := d.filterCached(d.registry[device])
  146. d.registryLock.RUnlock()
  147. d.mut.RLock()
  148. defer d.mut.RUnlock()
  149. var addrs []string
  150. if len(cached) > 0 {
  151. addrs = make([]string, len(cached))
  152. for i := range cached {
  153. addrs[i] = cached[i].Address
  154. }
  155. } else if len(d.clients) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
  156. // Only perform external lookups if we have at least one external
  157. // server client and one local announcement interval has passed. This is
  158. // to avoid finding local peers on their remote address at startup.
  159. results := make(chan []string, len(d.clients))
  160. wg := sync.WaitGroup{}
  161. for _, client := range d.clients {
  162. wg.Add(1)
  163. go func(c Client) {
  164. defer wg.Done()
  165. results <- c.Lookup(device)
  166. }(client)
  167. }
  168. wg.Wait()
  169. close(results)
  170. cached := []CacheEntry{}
  171. seen := make(map[string]struct{})
  172. now := time.Now()
  173. for result := range results {
  174. for _, addr := range result {
  175. _, ok := seen[addr]
  176. if !ok {
  177. cached = append(cached, CacheEntry{
  178. Address: addr,
  179. Seen: now,
  180. })
  181. seen[addr] = struct{}{}
  182. addrs = append(addrs, addr)
  183. }
  184. }
  185. }
  186. d.registryLock.Lock()
  187. d.registry[device] = cached
  188. d.registryLock.Unlock()
  189. }
  190. return addrs
  191. }
  192. func (d *Discoverer) Hint(device string, addrs []string) {
  193. resAddrs := resolveAddrs(addrs)
  194. var id protocol.DeviceID
  195. id.UnmarshalText([]byte(device))
  196. d.registerDevice(nil, Device{
  197. Addresses: resAddrs,
  198. ID: id[:],
  199. })
  200. }
  201. func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
  202. d.registryLock.RLock()
  203. devices := make(map[protocol.DeviceID][]CacheEntry, len(d.registry))
  204. for device, addrs := range d.registry {
  205. addrsCopy := make([]CacheEntry, len(addrs))
  206. copy(addrsCopy, addrs)
  207. devices[device] = addrsCopy
  208. }
  209. d.registryLock.RUnlock()
  210. return devices
  211. }
  212. func (d *Discoverer) announcementPkt() *Announce {
  213. var addrs []Address
  214. if d.extPort != 0 {
  215. addrs = []Address{{Port: d.extPort}}
  216. } else {
  217. for _, astr := range d.listenAddrs {
  218. addr, err := net.ResolveTCPAddr("tcp", astr)
  219. if err != nil {
  220. l.Warnln("%v: not announcing %s", err, astr)
  221. continue
  222. } else if debug {
  223. l.Debugf("discover: announcing %s: %#v", astr, addr)
  224. }
  225. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  226. addrs = append(addrs, Address{Port: uint16(addr.Port)})
  227. } else if bs := addr.IP.To4(); bs != nil {
  228. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  229. } else if bs := addr.IP.To16(); bs != nil {
  230. addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
  231. }
  232. }
  233. }
  234. return &Announce{
  235. Magic: AnnouncementMagic,
  236. This: Device{d.myID[:], addrs},
  237. }
  238. }
  239. func (d *Discoverer) sendLocalAnnouncements() {
  240. var addrs = resolveAddrs(d.listenAddrs)
  241. var pkt = Announce{
  242. Magic: AnnouncementMagic,
  243. This: Device{d.myID[:], addrs},
  244. }
  245. msg := pkt.MustMarshalXDR()
  246. for {
  247. if d.multicastBeacon != nil {
  248. d.multicastBeacon.Send(msg)
  249. }
  250. if d.broadcastBeacon != nil {
  251. d.broadcastBeacon.Send(msg)
  252. }
  253. select {
  254. case <-d.localBcastTick:
  255. case <-d.forcedBcastTick:
  256. }
  257. }
  258. }
  259. func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
  260. for {
  261. buf, addr := b.Recv()
  262. if debug {
  263. l.Debugf("discover: read announcement from %s:\n%s", addr, hex.Dump(buf))
  264. }
  265. var pkt Announce
  266. err := pkt.UnmarshalXDR(buf)
  267. if err != nil && err != io.EOF {
  268. continue
  269. }
  270. var newDevice bool
  271. if bytes.Compare(pkt.This.ID, d.myID[:]) != 0 {
  272. newDevice = d.registerDevice(addr, pkt.This)
  273. }
  274. if newDevice {
  275. select {
  276. case d.forcedBcastTick <- time.Now():
  277. }
  278. }
  279. }
  280. }
  281. func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool {
  282. var id protocol.DeviceID
  283. copy(id[:], device.ID)
  284. d.registryLock.Lock()
  285. defer d.registryLock.Unlock()
  286. current := d.filterCached(d.registry[id])
  287. orig := current
  288. for _, a := range device.Addresses {
  289. var deviceAddr string
  290. if len(a.IP) > 0 {
  291. deviceAddr = net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
  292. } else if addr != nil {
  293. ua := addr.(*net.UDPAddr)
  294. ua.Port = int(a.Port)
  295. deviceAddr = ua.String()
  296. }
  297. for i := range current {
  298. if current[i].Address == deviceAddr {
  299. current[i].Seen = time.Now()
  300. goto done
  301. }
  302. }
  303. current = append(current, CacheEntry{
  304. Address: deviceAddr,
  305. Seen: time.Now(),
  306. })
  307. done:
  308. }
  309. if debug {
  310. l.Debugf("discover: register: %v -> %v", id, current)
  311. }
  312. d.registry[id] = current
  313. if len(current) > len(orig) {
  314. addrs := make([]string, len(current))
  315. for i := range current {
  316. addrs[i] = current[i].Address
  317. }
  318. events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
  319. "device": id.String(),
  320. "addrs": addrs,
  321. })
  322. }
  323. return len(current) > len(orig)
  324. }
  325. func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry {
  326. for i := 0; i < len(c); {
  327. if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {
  328. if debug {
  329. l.Debugf("removing cached address %s: seen %v ago", c[i].Address, ago)
  330. }
  331. c[i] = c[len(c)-1]
  332. c = c[:len(c)-1]
  333. } else {
  334. i++
  335. }
  336. }
  337. return c
  338. }
  339. func addrToAddr(addr *net.TCPAddr) Address {
  340. if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
  341. return Address{Port: uint16(addr.Port)}
  342. } else if bs := addr.IP.To4(); bs != nil {
  343. return Address{IP: bs, Port: uint16(addr.Port)}
  344. } else if bs := addr.IP.To16(); bs != nil {
  345. return Address{IP: bs, Port: uint16(addr.Port)}
  346. }
  347. return Address{}
  348. }
  349. func resolveAddrs(addrs []string) []Address {
  350. var raddrs []Address
  351. for _, addrStr := range addrs {
  352. addrRes, err := net.ResolveTCPAddr("tcp", addrStr)
  353. if err != nil {
  354. continue
  355. }
  356. addr := addrToAddr(addrRes)
  357. if len(addr.IP) > 0 {
  358. raddrs = append(raddrs, addr)
  359. } else {
  360. raddrs = append(raddrs, Address{Port: addr.Port})
  361. }
  362. }
  363. return raddrs
  364. }