policy_reader.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package source
  4. import (
  5. "errors"
  6. "fmt"
  7. "io"
  8. "slices"
  9. "sort"
  10. "sync"
  11. "time"
  12. "tailscale.com/util/mak"
  13. "tailscale.com/util/set"
  14. "tailscale.com/util/syspolicy/internal/loggerx"
  15. "tailscale.com/util/syspolicy/internal/metrics"
  16. "tailscale.com/util/syspolicy/setting"
  17. )
  18. // Reader reads all configured policy settings from a given [Store].
  19. // It registers a change callback with the [Store] and maintains the current version
  20. // of the [setting.Snapshot] by lazily re-reading policy settings from the [Store]
  21. // whenever a new settings snapshot is requested with [Reader.GetSettings].
  22. // It is safe for concurrent use.
  23. type Reader struct {
  24. store Store
  25. origin *setting.Origin
  26. settings []*setting.Definition
  27. unregisterChangeNotifier func()
  28. doneCh chan struct{} // closed when [Reader] is closed.
  29. mu sync.Mutex
  30. closing bool
  31. upToDate bool
  32. lastPolicy *setting.Snapshot
  33. sessions set.HandleSet[*ReadingSession]
  34. }
  35. // newReader returns a new [Reader] that reads policy settings from a given [Store].
  36. // The returned reader takes ownership of the store. If the store implements [io.Closer],
  37. // the returned reader will close the store when it is closed.
  38. func newReader(store Store, origin *setting.Origin) (*Reader, error) {
  39. settings, err := setting.Definitions()
  40. if err != nil {
  41. return nil, err
  42. }
  43. if expirable, ok := store.(Expirable); ok {
  44. select {
  45. case <-expirable.Done():
  46. return nil, ErrStoreClosed
  47. default:
  48. }
  49. }
  50. reader := &Reader{store: store, origin: origin, settings: settings, doneCh: make(chan struct{})}
  51. if changeable, ok := store.(Changeable); ok {
  52. // We should subscribe to policy change notifications first before reading
  53. // the policy settings from the store. This way we won't miss any notifications.
  54. if reader.unregisterChangeNotifier, err = changeable.RegisterChangeCallback(reader.onPolicyChange); err != nil {
  55. // Errors registering policy change callbacks are non-fatal.
  56. // TODO(nickkhyl): implement a background policy refresh every X minutes?
  57. loggerx.Errorf("failed to register %v policy change callback: %v", origin, err)
  58. }
  59. }
  60. if _, err := reader.reload(true); err != nil {
  61. if reader.unregisterChangeNotifier != nil {
  62. reader.unregisterChangeNotifier()
  63. }
  64. return nil, err
  65. }
  66. if expirable, ok := store.(Expirable); ok {
  67. if waitCh := expirable.Done(); waitCh != nil {
  68. go func() {
  69. select {
  70. case <-waitCh:
  71. reader.Close()
  72. case <-reader.doneCh:
  73. }
  74. }()
  75. }
  76. }
  77. return reader, nil
  78. }
  79. // GetSettings returns the current [*setting.Snapshot],
  80. // re-reading it from from the underlying [Store] only if the policy
  81. // has changed since it was read last. It never fails and returns
  82. // the previous version of the policy settings if a read attempt fails.
  83. func (r *Reader) GetSettings() *setting.Snapshot {
  84. r.mu.Lock()
  85. upToDate, lastPolicy := r.upToDate, r.lastPolicy
  86. r.mu.Unlock()
  87. if upToDate {
  88. return lastPolicy
  89. }
  90. policy, err := r.reload(false)
  91. if err != nil {
  92. // If the policy fails to reload completely, log an error and return the last cached version.
  93. // However, errors related to individual policy items are always
  94. // propagated to callers when they fetch those settings.
  95. loggerx.Errorf("failed to reload %v policy: %v", r.origin, err)
  96. }
  97. return policy
  98. }
  99. // ReadSettings reads policy settings from the underlying [Store] even if no
  100. // changes were detected. It returns the new [*setting.Snapshot],nil on
  101. // success or an undefined snapshot (possibly `nil`) along with a non-`nil`
  102. // error in case of failure.
  103. func (r *Reader) ReadSettings() (*setting.Snapshot, error) {
  104. return r.reload(true)
  105. }
  106. // reload is like [Reader.ReadSettings], but allows specifying whether to re-read
  107. // an unchanged policy, and returns the last [*setting.Snapshot] if the read fails.
  108. func (r *Reader) reload(force bool) (*setting.Snapshot, error) {
  109. r.mu.Lock()
  110. defer r.mu.Unlock()
  111. if r.upToDate && !force {
  112. return r.lastPolicy, nil
  113. }
  114. if lockable, ok := r.store.(Lockable); ok {
  115. if err := lockable.Lock(); err != nil {
  116. return r.lastPolicy, err
  117. }
  118. defer lockable.Unlock()
  119. }
  120. r.upToDate = true
  121. metrics.Reset(r.origin)
  122. var m map[setting.Key]setting.RawItem
  123. if lastPolicyCount := r.lastPolicy.Len(); lastPolicyCount > 0 {
  124. m = make(map[setting.Key]setting.RawItem, lastPolicyCount)
  125. }
  126. for _, s := range r.settings {
  127. if !r.origin.Scope().IsConfigurableSetting(s) {
  128. // Skip settings that cannot be configured in the current scope.
  129. continue
  130. }
  131. val, err := readPolicySettingValue(r.store, s)
  132. if err != nil && (errors.Is(err, setting.ErrNoSuchKey) || errors.Is(err, setting.ErrNotConfigured)) {
  133. metrics.ReportNotConfigured(r.origin, s)
  134. continue
  135. }
  136. if err == nil {
  137. metrics.ReportConfigured(r.origin, s, val)
  138. } else {
  139. metrics.ReportError(r.origin, s, err)
  140. }
  141. // If there's an error reading a single policy, such as a value type mismatch,
  142. // we'll wrap the error to preserve its text and return it
  143. // whenever someone attempts to fetch the value.
  144. // Otherwise, the errorText will be nil.
  145. errorText := setting.MaybeErrorText(err)
  146. item := setting.RawItemWith(val, errorText, r.origin)
  147. mak.Set(&m, s.Key(), item)
  148. }
  149. newPolicy := setting.NewSnapshot(m, setting.SummaryWith(r.origin))
  150. if r.lastPolicy == nil || !newPolicy.EqualItems(r.lastPolicy) {
  151. r.lastPolicy = newPolicy
  152. }
  153. return r.lastPolicy, nil
  154. }
  155. // ReadingSession is like [Reader], but with a channel that's written
  156. // to when there's a policy change, and closed when the session is terminated.
  157. type ReadingSession struct {
  158. reader *Reader
  159. policyChangedCh chan struct{} // 1-buffered channel
  160. handle set.Handle // in the reader.sessions
  161. closeInternal func()
  162. }
  163. // OpenSession opens and returns a new session to r, allowing the caller
  164. // to get notified whenever a policy change is reported by the [source.Store],
  165. // or an [ErrStoreClosed] if the reader has already been closed.
  166. func (r *Reader) OpenSession() (*ReadingSession, error) {
  167. session := &ReadingSession{
  168. reader: r,
  169. policyChangedCh: make(chan struct{}, 1),
  170. }
  171. session.closeInternal = sync.OnceFunc(func() { close(session.policyChangedCh) })
  172. r.mu.Lock()
  173. defer r.mu.Unlock()
  174. if r.closing {
  175. return nil, ErrStoreClosed
  176. }
  177. session.handle = r.sessions.Add(session)
  178. return session, nil
  179. }
  180. // GetSettings is like [Reader.GetSettings].
  181. func (s *ReadingSession) GetSettings() *setting.Snapshot {
  182. return s.reader.GetSettings()
  183. }
  184. // ReadSettings is like [Reader.ReadSettings].
  185. func (s *ReadingSession) ReadSettings() (*setting.Snapshot, error) {
  186. return s.reader.ReadSettings()
  187. }
  188. // PolicyChanged returns a channel that's written to when
  189. // there's a policy change, closed when the session is terminated.
  190. func (s *ReadingSession) PolicyChanged() <-chan struct{} {
  191. return s.policyChangedCh
  192. }
  193. // Close unregisters this session with the [Reader].
  194. func (s *ReadingSession) Close() {
  195. s.reader.mu.Lock()
  196. delete(s.reader.sessions, s.handle)
  197. s.closeInternal()
  198. s.reader.mu.Unlock()
  199. }
  200. // onPolicyChange handles a policy change notification from the [Store],
  201. // invalidating the current [setting.Snapshot] in r,
  202. // and notifying the active [ReadingSession]s.
  203. func (r *Reader) onPolicyChange() {
  204. r.mu.Lock()
  205. defer r.mu.Unlock()
  206. r.upToDate = false
  207. for _, s := range r.sessions {
  208. select {
  209. case s.policyChangedCh <- struct{}{}:
  210. // Notified.
  211. default:
  212. // 1-buffered channel is full, meaning that another policy change
  213. // notification is already en route.
  214. }
  215. }
  216. }
  217. // Close closes the store reader and the underlying store.
  218. func (r *Reader) Close() error {
  219. r.mu.Lock()
  220. if r.closing {
  221. r.mu.Unlock()
  222. return nil
  223. }
  224. r.closing = true
  225. r.mu.Unlock()
  226. if r.unregisterChangeNotifier != nil {
  227. r.unregisterChangeNotifier()
  228. r.unregisterChangeNotifier = nil
  229. }
  230. if closer, ok := r.store.(io.Closer); ok {
  231. if err := closer.Close(); err != nil {
  232. return err
  233. }
  234. }
  235. r.store = nil
  236. close(r.doneCh)
  237. r.mu.Lock()
  238. defer r.mu.Unlock()
  239. for _, c := range r.sessions {
  240. c.closeInternal()
  241. }
  242. r.sessions = nil
  243. return nil
  244. }
  245. // Done returns a channel that is closed when the reader is closed.
  246. func (r *Reader) Done() <-chan struct{} {
  247. return r.doneCh
  248. }
  249. // ReadableSource is a [Source] open for reading.
  250. type ReadableSource struct {
  251. *Source
  252. *ReadingSession
  253. }
  254. // Close closes the underlying [ReadingSession].
  255. func (s ReadableSource) Close() {
  256. s.ReadingSession.Close()
  257. }
  258. // ReadableSources is a slice of [ReadableSource].
  259. type ReadableSources []ReadableSource
  260. // Contains reports whether s contains the specified source.
  261. func (s ReadableSources) Contains(source *Source) bool {
  262. return s.IndexOf(source) != -1
  263. }
  264. // IndexOf returns position of the specified source in s, or -1
  265. // if the source does not exist.
  266. func (s ReadableSources) IndexOf(source *Source) int {
  267. return slices.IndexFunc(s, func(rs ReadableSource) bool {
  268. return rs.Source == source
  269. })
  270. }
  271. // InsertionIndexOf returns the position at which source can be inserted
  272. // to maintain the sorted order of the readableSources.
  273. // The return value is unspecified if s is not sorted on entry to InsertionIndexOf.
  274. func (s ReadableSources) InsertionIndexOf(source *Source) int {
  275. // Insert new sources after any existing sources with the same precedence,
  276. // and just before the first source with higher precedence.
  277. // Just like stable sort, but for insertion.
  278. // It's okay to use linear search as insertions are rare
  279. // and we never have more than just a few policy sources.
  280. higherPrecedence := func(rs ReadableSource) bool { return rs.Compare(source) > 0 }
  281. if i := slices.IndexFunc(s, higherPrecedence); i != -1 {
  282. return i
  283. }
  284. return len(s)
  285. }
  286. // StableSort sorts [ReadableSource] in s by precedence, so that policy
  287. // settings from sources with higher precedence (e.g., [DeviceScope])
  288. // will be read and merged last, overriding any policy settings with
  289. // the same keys configured in sources with lower precedence
  290. // (e.g., [CurrentUserScope]).
  291. func (s *ReadableSources) StableSort() {
  292. sort.SliceStable(*s, func(i, j int) bool {
  293. return (*s)[i].Source.Compare((*s)[j].Source) < 0
  294. })
  295. }
  296. // DeleteAt closes and deletes the i-th source from s.
  297. func (s *ReadableSources) DeleteAt(i int) {
  298. (*s)[i].Close()
  299. *s = slices.Delete(*s, i, i+1)
  300. }
  301. // Close closes and deletes all sources in s.
  302. func (s *ReadableSources) Close() {
  303. for _, s := range *s {
  304. s.Close()
  305. }
  306. *s = nil
  307. }
  308. func readPolicySettingValue(store Store, s *setting.Definition) (value any, err error) {
  309. switch key := s.Key(); s.Type() {
  310. case setting.BooleanValue:
  311. return store.ReadBoolean(key)
  312. case setting.IntegerValue:
  313. return store.ReadUInt64(key)
  314. case setting.StringValue:
  315. return store.ReadString(key)
  316. case setting.StringListValue:
  317. return store.ReadStringArray(key)
  318. case setting.PreferenceOptionValue:
  319. s, err := store.ReadString(key)
  320. if err == nil {
  321. var value setting.PreferenceOption
  322. if err = value.UnmarshalText([]byte(s)); err == nil {
  323. return value, nil
  324. }
  325. }
  326. return setting.ShowChoiceByPolicy, err
  327. case setting.VisibilityValue:
  328. s, err := store.ReadString(key)
  329. if err == nil {
  330. var value setting.Visibility
  331. if err = value.UnmarshalText([]byte(s)); err == nil {
  332. return value, nil
  333. }
  334. }
  335. return setting.VisibleByPolicy, err
  336. case setting.DurationValue:
  337. s, err := store.ReadString(key)
  338. if err == nil {
  339. var value time.Duration
  340. if value, err = time.ParseDuration(s); err == nil {
  341. return value, nil
  342. }
  343. }
  344. return nil, err
  345. default:
  346. return nil, fmt.Errorf("%w: unsupported setting type: %v", setting.ErrTypeMismatch, s.Type())
  347. }
  348. }