service_map.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. // Copyright (C) 2023 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "errors"
  10. "fmt"
  11. "time"
  12. "github.com/syncthing/syncthing/lib/events"
  13. "github.com/syncthing/syncthing/lib/svcutil"
  14. "github.com/thejerf/suture/v4"
  15. )
  16. var errSvcNotFound = errors.New("service not found")
  17. // A serviceMap is a utility map of arbitrary keys to a suture.Service of
  18. // some kind, where adding and removing services ensures they are properly
  19. // started and stopped on the given Supervisor. The serviceMap is itself a
  20. // suture.Service and should be added to a Supervisor.
  21. // Not safe for concurrent use.
  22. type serviceMap[K comparable, S suture.Service] struct {
  23. services map[K]S
  24. tokens map[K]suture.ServiceToken
  25. supervisor *suture.Supervisor
  26. eventLogger events.Logger
  27. }
  28. func newServiceMap[K comparable, S suture.Service](eventLogger events.Logger) *serviceMap[K, S] {
  29. m := &serviceMap[K, S]{
  30. services: make(map[K]S),
  31. tokens: make(map[K]suture.ServiceToken),
  32. eventLogger: eventLogger,
  33. }
  34. m.supervisor = suture.New(m.String(), svcutil.SpecWithDebugLogger())
  35. return m
  36. }
  37. // Add adds a service to the map, starting it on the supervisor. If there is
  38. // already a service at the given key, it is removed first.
  39. func (s *serviceMap[K, S]) Add(k K, v S) {
  40. if tok, ok := s.tokens[k]; ok {
  41. // There is already a service at this key, remove it first.
  42. s.supervisor.Remove(tok)
  43. }
  44. s.services[k] = v
  45. s.tokens[k] = s.supervisor.Add(v)
  46. }
  47. // Get returns the service at the given key, or the empty value and false if
  48. // there is no service at that key.
  49. func (s *serviceMap[K, S]) Get(k K) (v S, ok bool) {
  50. v, ok = s.services[k]
  51. return
  52. }
  53. // Stop removes the service at the given key from the supervisor, stopping it.
  54. // The service itself is still retained, i.e. a call to Get with the same key
  55. // will still return a result.
  56. func (s *serviceMap[K, S]) Stop(k K) {
  57. if tok, ok := s.tokens[k]; ok {
  58. s.supervisor.Remove(tok)
  59. }
  60. }
  61. // StopAndWaitChan removes the service at the given key from the supervisor,
  62. // stopping it. The service itself is still retained, i.e. a call to Get with
  63. // the same key will still return a result.
  64. // The returned channel will produce precisely one error value: either the
  65. // return value from RemoveAndWait (possibly nil), or errSvcNotFound if the
  66. // service was not found.
  67. func (s *serviceMap[K, S]) StopAndWaitChan(k K, timeout time.Duration) <-chan error {
  68. ret := make(chan error, 1)
  69. if tok, ok := s.tokens[k]; ok {
  70. go func() {
  71. ret <- s.supervisor.RemoveAndWait(tok, timeout)
  72. }()
  73. } else {
  74. ret <- errSvcNotFound
  75. }
  76. return ret
  77. }
  78. // Remove removes the service at the given key, stopping it on the supervisor.
  79. // If there is no service at the given key, nothing happens. The return value
  80. // indicates whether a service was removed.
  81. func (s *serviceMap[K, S]) Remove(k K) (found bool) {
  82. if tok, ok := s.tokens[k]; ok {
  83. found = true
  84. s.supervisor.Remove(tok)
  85. } else {
  86. _, found = s.services[k]
  87. }
  88. delete(s.services, k)
  89. delete(s.tokens, k)
  90. return
  91. }
  92. // RemoveAndWait removes the service at the given key, stopping it on the
  93. // supervisor. Returns errSvcNotFound if there is no service at the given
  94. // key, otherwise the return value from the supervisor's RemoveAndWait.
  95. func (s *serviceMap[K, S]) RemoveAndWait(k K, timeout time.Duration) error {
  96. return <-s.RemoveAndWaitChan(k, timeout)
  97. }
  98. // RemoveAndWaitChan removes the service at the given key, stopping it on
  99. // the supervisor. The returned channel will produce precisely one error
  100. // value: either the return value from RemoveAndWait (possibly nil), or
  101. // errSvcNotFound if the service was not found.
  102. func (s *serviceMap[K, S]) RemoveAndWaitChan(k K, timeout time.Duration) <-chan error {
  103. ret := s.StopAndWaitChan(k, timeout)
  104. delete(s.services, k)
  105. return ret
  106. }
  107. // Each calls the given function for each service in the map. An error from
  108. // fn will stop the iteration and be returned as-is.
  109. func (s *serviceMap[K, S]) Each(fn func(K, S) error) error {
  110. for key, svc := range s.services {
  111. if err := fn(key, svc); err != nil {
  112. return err
  113. }
  114. }
  115. return nil
  116. }
  117. // Suture implementation
  118. func (s *serviceMap[K, S]) Serve(ctx context.Context) error {
  119. return s.supervisor.Serve(ctx)
  120. }
  121. func (s *serviceMap[K, S]) String() string {
  122. var kv K
  123. var sv S
  124. return fmt.Sprintf("serviceMap[%T, %T]@%p", kv, sv, s)
  125. }