beacon.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package beacon
  7. import (
  8. "context"
  9. "fmt"
  10. "net"
  11. "time"
  12. "github.com/thejerf/suture"
  13. "github.com/syncthing/syncthing/lib/util"
  14. )
  15. type recv struct {
  16. data []byte
  17. src net.Addr
  18. }
  19. type Interface interface {
  20. suture.Service
  21. fmt.Stringer
  22. Send(data []byte)
  23. Recv() ([]byte, net.Addr)
  24. Error() error
  25. }
  26. type cast struct {
  27. *suture.Supervisor
  28. name string
  29. reader util.ServiceWithError
  30. writer util.ServiceWithError
  31. outbox chan recv
  32. inbox chan []byte
  33. stopped chan struct{}
  34. }
  35. // newCast creates a base object for multi- or broadcasting. Afterwards the
  36. // caller needs to set reader and writer with the addReader and addWriter
  37. // methods to get a functional implementation of Interface.
  38. func newCast(name string) *cast {
  39. return &cast{
  40. Supervisor: suture.New(name, suture.Spec{
  41. // Don't retry too frenetically: an error to open a socket or
  42. // whatever is usually something that is either permanent or takes
  43. // a while to get solved...
  44. FailureThreshold: 2,
  45. FailureBackoff: 60 * time.Second,
  46. // Only log restarts in debug mode.
  47. Log: func(line string) {
  48. l.Debugln(line)
  49. },
  50. PassThroughPanics: true,
  51. }),
  52. name: name,
  53. inbox: make(chan []byte),
  54. outbox: make(chan recv, 16),
  55. stopped: make(chan struct{}),
  56. }
  57. }
  58. func (c *cast) addReader(svc func(context.Context) error) {
  59. c.reader = c.createService(svc, "reader")
  60. c.Add(c.reader)
  61. }
  62. func (c *cast) addWriter(svc func(ctx context.Context) error) {
  63. c.writer = c.createService(svc, "writer")
  64. c.Add(c.writer)
  65. }
  66. func (c *cast) createService(svc func(context.Context) error, suffix string) util.ServiceWithError {
  67. return util.AsServiceWithError(func(ctx context.Context) error {
  68. l.Debugln("Starting", c.name, suffix)
  69. err := svc(ctx)
  70. l.Debugf("Stopped %v %v: %v", c.name, suffix, err)
  71. return err
  72. }, fmt.Sprintf("%s/%s", c, suffix))
  73. }
  74. func (c *cast) Stop() {
  75. c.Supervisor.Stop()
  76. close(c.stopped)
  77. }
  78. func (c *cast) String() string {
  79. return fmt.Sprintf("%s@%p", c.name, c)
  80. }
  81. func (c *cast) Send(data []byte) {
  82. select {
  83. case c.inbox <- data:
  84. case <-c.stopped:
  85. }
  86. }
  87. func (c *cast) Recv() ([]byte, net.Addr) {
  88. select {
  89. case recv := <-c.outbox:
  90. return recv.data, recv.src
  91. case <-c.stopped:
  92. }
  93. return nil, nil
  94. }
  95. func (c *cast) Error() error {
  96. if err := c.reader.Error(); err != nil {
  97. return err
  98. }
  99. return c.writer.Error()
  100. }