multicast.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package beacon
  7. import (
  8. "errors"
  9. "fmt"
  10. "net"
  11. "time"
  12. "github.com/thejerf/suture"
  13. "golang.org/x/net/ipv6"
  14. )
  15. type Multicast struct {
  16. *suture.Supervisor
  17. addr *net.UDPAddr
  18. inbox chan []byte
  19. outbox chan recv
  20. mr *multicastReader
  21. mw *multicastWriter
  22. }
  23. func NewMulticast(addr string) *Multicast {
  24. m := &Multicast{
  25. Supervisor: suture.New("multicastBeacon", suture.Spec{
  26. // Don't retry too frenetically: an error to open a socket or
  27. // whatever is usually something that is either permanent or takes
  28. // a while to get solved...
  29. FailureThreshold: 2,
  30. FailureBackoff: 60 * time.Second,
  31. // Only log restarts in debug mode.
  32. Log: func(line string) {
  33. if debug {
  34. l.Debugln(line)
  35. }
  36. },
  37. }),
  38. inbox: make(chan []byte),
  39. outbox: make(chan recv, 16),
  40. }
  41. m.mr = &multicastReader{
  42. addr: addr,
  43. outbox: m.outbox,
  44. stop: make(chan struct{}),
  45. }
  46. m.Add(m.mr)
  47. m.mw = &multicastWriter{
  48. addr: addr,
  49. inbox: m.inbox,
  50. stop: make(chan struct{}),
  51. }
  52. m.Add(m.mw)
  53. return m
  54. }
  55. func (m *Multicast) Send(data []byte) {
  56. m.inbox <- data
  57. }
  58. func (m *Multicast) Recv() ([]byte, net.Addr) {
  59. recv := <-m.outbox
  60. return recv.data, recv.src
  61. }
  62. func (m *Multicast) Error() error {
  63. if err := m.mr.Error(); err != nil {
  64. return err
  65. }
  66. return m.mw.Error()
  67. }
  68. type multicastWriter struct {
  69. addr string
  70. inbox <-chan []byte
  71. errorHolder
  72. stop chan struct{}
  73. }
  74. func (w *multicastWriter) Serve() {
  75. if debug {
  76. l.Debugln(w, "starting")
  77. defer l.Debugln(w, "stopping")
  78. }
  79. gaddr, err := net.ResolveUDPAddr("udp6", w.addr)
  80. if err != nil {
  81. if debug {
  82. l.Debugln(err)
  83. }
  84. w.setError(err)
  85. return
  86. }
  87. conn, err := net.ListenPacket("udp6", ":0")
  88. if err != nil {
  89. if debug {
  90. l.Debugln(err)
  91. }
  92. w.setError(err)
  93. return
  94. }
  95. pconn := ipv6.NewPacketConn(conn)
  96. wcm := &ipv6.ControlMessage{
  97. HopLimit: 1,
  98. }
  99. for bs := range w.inbox {
  100. intfs, err := net.Interfaces()
  101. if err != nil {
  102. if debug {
  103. l.Debugln(err)
  104. }
  105. w.setError(err)
  106. return
  107. }
  108. success := 0
  109. for _, intf := range intfs {
  110. wcm.IfIndex = intf.Index
  111. pconn.SetWriteDeadline(time.Now().Add(time.Second))
  112. _, err = pconn.WriteTo(bs, wcm, gaddr)
  113. pconn.SetWriteDeadline(time.Time{})
  114. if err != nil {
  115. if debug {
  116. l.Debugln(err, "on write to", gaddr, intf.Name)
  117. }
  118. w.setError(err)
  119. continue
  120. }
  121. if debug {
  122. l.Debugf("sent %d bytes to %v on %s", len(bs), gaddr, intf.Name)
  123. }
  124. success++
  125. }
  126. if success > 0 {
  127. w.setError(nil)
  128. } else {
  129. if debug {
  130. l.Debugln(err)
  131. }
  132. w.setError(err)
  133. }
  134. }
  135. }
  136. func (w *multicastWriter) Stop() {
  137. close(w.stop)
  138. }
  139. func (w *multicastWriter) String() string {
  140. return fmt.Sprintf("multicastWriter@%p", w)
  141. }
  142. type multicastReader struct {
  143. addr string
  144. outbox chan<- recv
  145. errorHolder
  146. stop chan struct{}
  147. }
  148. func (r *multicastReader) Serve() {
  149. if debug {
  150. l.Debugln(r, "starting")
  151. defer l.Debugln(r, "stopping")
  152. }
  153. gaddr, err := net.ResolveUDPAddr("udp6", r.addr)
  154. if err != nil {
  155. if debug {
  156. l.Debugln(err)
  157. }
  158. r.setError(err)
  159. return
  160. }
  161. conn, err := net.ListenPacket("udp6", r.addr)
  162. if err != nil {
  163. if debug {
  164. l.Debugln(err)
  165. }
  166. r.setError(err)
  167. return
  168. }
  169. intfs, err := net.Interfaces()
  170. if err != nil {
  171. if debug {
  172. l.Debugln(err)
  173. }
  174. r.setError(err)
  175. return
  176. }
  177. pconn := ipv6.NewPacketConn(conn)
  178. joined := 0
  179. for _, intf := range intfs {
  180. err := pconn.JoinGroup(&intf, &net.UDPAddr{IP: gaddr.IP})
  181. if debug {
  182. if err != nil {
  183. l.Debugln("IPv6 join", intf.Name, "failed:", err)
  184. } else {
  185. l.Debugln("IPv6 join", intf.Name, "success")
  186. }
  187. }
  188. joined++
  189. }
  190. if joined == 0 {
  191. if debug {
  192. l.Debugln("no multicast interfaces available")
  193. }
  194. r.setError(errors.New("no multicast interfaces available"))
  195. return
  196. }
  197. bs := make([]byte, 65536)
  198. for {
  199. n, _, addr, err := pconn.ReadFrom(bs)
  200. if err != nil {
  201. if debug {
  202. l.Debugln(err)
  203. }
  204. r.setError(err)
  205. continue
  206. }
  207. if debug {
  208. l.Debugf("recv %d bytes from %s", n, addr)
  209. }
  210. c := make([]byte, n)
  211. copy(c, bs)
  212. select {
  213. case r.outbox <- recv{c, addr}:
  214. default:
  215. if debug {
  216. l.Debugln("dropping message")
  217. }
  218. }
  219. }
  220. }
  221. func (r *multicastReader) Stop() {
  222. close(r.stop)
  223. }
  224. func (r *multicastReader) String() string {
  225. return fmt.Sprintf("multicastReader@%p", r)
  226. }