beacon.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package beacon
  7. import (
  8. "fmt"
  9. "net"
  10. "time"
  11. "github.com/thejerf/suture"
  12. "github.com/syncthing/syncthing/lib/util"
  13. )
  14. type recv struct {
  15. data []byte
  16. src net.Addr
  17. }
  18. type Interface interface {
  19. suture.Service
  20. fmt.Stringer
  21. Send(data []byte)
  22. Recv() ([]byte, net.Addr)
  23. Error() error
  24. }
  25. type cast struct {
  26. *suture.Supervisor
  27. name string
  28. reader util.ServiceWithError
  29. writer util.ServiceWithError
  30. outbox chan recv
  31. inbox chan []byte
  32. stopped chan struct{}
  33. }
  34. // newCast creates a base object for multi- or broadcasting. Afterwards the
  35. // caller needs to set reader and writer with the addReader and addWriter
  36. // methods to get a functional implementation of Interface.
  37. func newCast(name string) *cast {
  38. return &cast{
  39. Supervisor: suture.New(name, suture.Spec{
  40. // Don't retry too frenetically: an error to open a socket or
  41. // whatever is usually something that is either permanent or takes
  42. // a while to get solved...
  43. FailureThreshold: 2,
  44. FailureBackoff: 60 * time.Second,
  45. // Only log restarts in debug mode.
  46. Log: func(line string) {
  47. l.Debugln(line)
  48. },
  49. PassThroughPanics: true,
  50. }),
  51. name: name,
  52. inbox: make(chan []byte),
  53. outbox: make(chan recv, 16),
  54. stopped: make(chan struct{}),
  55. }
  56. }
  57. func (c *cast) addReader(svc func(chan struct{}) error) {
  58. c.reader = c.createService(svc, "reader")
  59. c.Add(c.reader)
  60. }
  61. func (c *cast) addWriter(svc func(stop chan struct{}) error) {
  62. c.writer = c.createService(svc, "writer")
  63. c.Add(c.writer)
  64. }
  65. func (c *cast) createService(svc func(chan struct{}) error, suffix string) util.ServiceWithError {
  66. return util.AsServiceWithError(func(stop chan struct{}) error {
  67. l.Debugln("Starting", c.name, suffix)
  68. err := svc(stop)
  69. l.Debugf("Stopped %v %v: %v", c.name, suffix, err)
  70. return err
  71. })
  72. }
  73. func (c *cast) Stop() {
  74. c.Supervisor.Stop()
  75. close(c.stopped)
  76. }
  77. func (c *cast) String() string {
  78. return fmt.Sprintf("%s@%p", c.name, c)
  79. }
  80. func (c *cast) Send(data []byte) {
  81. select {
  82. case c.inbox <- data:
  83. case <-c.stopped:
  84. }
  85. }
  86. func (c *cast) Recv() ([]byte, net.Addr) {
  87. select {
  88. case recv := <-c.outbox:
  89. return recv.data, recv.src
  90. case <-c.stopped:
  91. }
  92. return nil, nil
  93. }
  94. func (c *cast) Error() error {
  95. if err := c.reader.Error(); err != nil {
  96. return err
  97. }
  98. return c.writer.Error()
  99. }