beacon.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package beacon
  7. import (
  8. "context"
  9. "fmt"
  10. "net"
  11. "time"
  12. "github.com/thejerf/suture/v4"
  13. "github.com/syncthing/syncthing/lib/util"
  14. )
  15. type recv struct {
  16. data []byte
  17. src net.Addr
  18. }
  19. type Interface interface {
  20. suture.Service
  21. fmt.Stringer
  22. Send(data []byte)
  23. Recv() ([]byte, net.Addr)
  24. Error() error
  25. }
  26. type cast struct {
  27. *suture.Supervisor
  28. name string
  29. reader util.ServiceWithError
  30. writer util.ServiceWithError
  31. outbox chan recv
  32. inbox chan []byte
  33. stopped chan struct{}
  34. }
  35. // newCast creates a base object for multi- or broadcasting. Afterwards the
  36. // caller needs to set reader and writer with the addReader and addWriter
  37. // methods to get a functional implementation of Interface.
  38. func newCast(name string) *cast {
  39. spec := util.Spec()
  40. // Don't retry too frenetically: an error to open a socket or
  41. // whatever is usually something that is either permanent or takes
  42. // a while to get solved...
  43. spec.FailureThreshold = 2
  44. spec.FailureBackoff = 60 * time.Second
  45. // Only log restarts in debug mode.
  46. spec.EventHook = func(e suture.Event) {
  47. l.Debugln(e)
  48. }
  49. c := &cast{
  50. Supervisor: suture.New(name, spec),
  51. name: name,
  52. inbox: make(chan []byte),
  53. outbox: make(chan recv, 16),
  54. stopped: make(chan struct{}),
  55. }
  56. util.OnSupervisorDone(c.Supervisor, func() { close(c.stopped) })
  57. return c
  58. }
  59. func (c *cast) addReader(svc func(context.Context) error) {
  60. c.reader = c.createService(svc, "reader")
  61. c.Add(c.reader)
  62. }
  63. func (c *cast) addWriter(svc func(ctx context.Context) error) {
  64. c.writer = c.createService(svc, "writer")
  65. c.Add(c.writer)
  66. }
  67. func (c *cast) createService(svc func(context.Context) error, suffix string) util.ServiceWithError {
  68. return util.AsService(svc, fmt.Sprintf("%s/%s", c, suffix))
  69. }
  70. func (c *cast) String() string {
  71. return fmt.Sprintf("%s@%p", c.name, c)
  72. }
  73. func (c *cast) Send(data []byte) {
  74. select {
  75. case c.inbox <- data:
  76. case <-c.stopped:
  77. }
  78. }
  79. func (c *cast) Recv() ([]byte, net.Addr) {
  80. select {
  81. case recv := <-c.outbox:
  82. return recv.data, recv.src
  83. case <-c.stopped:
  84. }
  85. return nil, nil
  86. }
  87. func (c *cast) Error() error {
  88. if err := c.reader.Error(); err != nil {
  89. return err
  90. }
  91. return c.writer.Error()
  92. }