policy_reader.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package source
  4. import (
  5. "errors"
  6. "fmt"
  7. "io"
  8. "slices"
  9. "sort"
  10. "sync"
  11. "time"
  12. "tailscale.com/util/mak"
  13. "tailscale.com/util/set"
  14. "tailscale.com/util/syspolicy/internal/loggerx"
  15. "tailscale.com/util/syspolicy/internal/metrics"
  16. "tailscale.com/util/syspolicy/pkey"
  17. "tailscale.com/util/syspolicy/setting"
  18. )
  19. // Reader reads all configured policy settings from a given [Store].
  20. // It registers a change callback with the [Store] and maintains the current version
  21. // of the [setting.Snapshot] by lazily re-reading policy settings from the [Store]
  22. // whenever a new settings snapshot is requested with [Reader.GetSettings].
  23. // It is safe for concurrent use.
  24. type Reader struct {
  25. store Store
  26. origin *setting.Origin
  27. settings []*setting.Definition
  28. unregisterChangeNotifier func()
  29. doneCh chan struct{} // closed when [Reader] is closed.
  30. mu sync.Mutex
  31. closing bool
  32. upToDate bool
  33. lastPolicy *setting.Snapshot
  34. sessions set.HandleSet[*ReadingSession]
  35. }
  36. // newReader returns a new [Reader] that reads policy settings from a given [Store].
  37. // The returned reader takes ownership of the store. If the store implements [io.Closer],
  38. // the returned reader will close the store when it is closed.
  39. func newReader(store Store, origin *setting.Origin) (*Reader, error) {
  40. settings, err := setting.Definitions()
  41. if err != nil {
  42. return nil, err
  43. }
  44. if expirable, ok := store.(Expirable); ok {
  45. select {
  46. case <-expirable.Done():
  47. return nil, ErrStoreClosed
  48. default:
  49. }
  50. }
  51. reader := &Reader{store: store, origin: origin, settings: settings, doneCh: make(chan struct{})}
  52. if changeable, ok := store.(Changeable); ok {
  53. // We should subscribe to policy change notifications first before reading
  54. // the policy settings from the store. This way we won't miss any notifications.
  55. if reader.unregisterChangeNotifier, err = changeable.RegisterChangeCallback(reader.onPolicyChange); err != nil {
  56. // Errors registering policy change callbacks are non-fatal.
  57. // TODO(nickkhyl): implement a background policy refresh every X minutes?
  58. loggerx.Errorf("failed to register %v policy change callback: %v", origin, err)
  59. }
  60. }
  61. if _, err := reader.reload(true); err != nil {
  62. if reader.unregisterChangeNotifier != nil {
  63. reader.unregisterChangeNotifier()
  64. }
  65. return nil, err
  66. }
  67. if expirable, ok := store.(Expirable); ok {
  68. if waitCh := expirable.Done(); waitCh != nil {
  69. go func() {
  70. select {
  71. case <-waitCh:
  72. reader.Close()
  73. case <-reader.doneCh:
  74. }
  75. }()
  76. }
  77. }
  78. return reader, nil
  79. }
  80. // GetSettings returns the current [*setting.Snapshot],
  81. // re-reading it from from the underlying [Store] only if the policy
  82. // has changed since it was read last. It never fails and returns
  83. // the previous version of the policy settings if a read attempt fails.
  84. func (r *Reader) GetSettings() *setting.Snapshot {
  85. r.mu.Lock()
  86. upToDate, lastPolicy := r.upToDate, r.lastPolicy
  87. r.mu.Unlock()
  88. if upToDate {
  89. return lastPolicy
  90. }
  91. policy, err := r.reload(false)
  92. if err != nil {
  93. // If the policy fails to reload completely, log an error and return the last cached version.
  94. // However, errors related to individual policy items are always
  95. // propagated to callers when they fetch those settings.
  96. loggerx.Errorf("failed to reload %v policy: %v", r.origin, err)
  97. }
  98. return policy
  99. }
  100. // ReadSettings reads policy settings from the underlying [Store] even if no
  101. // changes were detected. It returns the new [*setting.Snapshot],nil on
  102. // success or an undefined snapshot (possibly `nil`) along with a non-`nil`
  103. // error in case of failure.
  104. func (r *Reader) ReadSettings() (*setting.Snapshot, error) {
  105. return r.reload(true)
  106. }
  107. // reload is like [Reader.ReadSettings], but allows specifying whether to re-read
  108. // an unchanged policy, and returns the last [*setting.Snapshot] if the read fails.
  109. func (r *Reader) reload(force bool) (*setting.Snapshot, error) {
  110. r.mu.Lock()
  111. defer r.mu.Unlock()
  112. if r.upToDate && !force {
  113. return r.lastPolicy, nil
  114. }
  115. if lockable, ok := r.store.(Lockable); ok {
  116. if err := lockable.Lock(); err != nil {
  117. return r.lastPolicy, err
  118. }
  119. defer lockable.Unlock()
  120. }
  121. r.upToDate = true
  122. metrics.Reset(r.origin)
  123. var m map[pkey.Key]setting.RawItem
  124. if lastPolicyCount := r.lastPolicy.Len(); lastPolicyCount > 0 {
  125. m = make(map[pkey.Key]setting.RawItem, lastPolicyCount)
  126. }
  127. for _, s := range r.settings {
  128. if !r.origin.Scope().IsConfigurableSetting(s) {
  129. // Skip settings that cannot be configured in the current scope.
  130. continue
  131. }
  132. val, err := readPolicySettingValue(r.store, s)
  133. if err != nil && (errors.Is(err, setting.ErrNoSuchKey) || errors.Is(err, setting.ErrNotConfigured)) {
  134. metrics.ReportNotConfigured(r.origin, s)
  135. continue
  136. }
  137. if err == nil {
  138. metrics.ReportConfigured(r.origin, s, val)
  139. } else {
  140. metrics.ReportError(r.origin, s, err)
  141. }
  142. // If there's an error reading a single policy, such as a value type mismatch,
  143. // we'll wrap the error to preserve its text and return it
  144. // whenever someone attempts to fetch the value.
  145. // Otherwise, the errorText will be nil.
  146. errorText := setting.MaybeErrorText(err)
  147. item := setting.RawItemWith(val, errorText, r.origin)
  148. mak.Set(&m, s.Key(), item)
  149. }
  150. newPolicy := setting.NewSnapshot(m, setting.SummaryWith(r.origin))
  151. if r.lastPolicy == nil || !newPolicy.EqualItems(r.lastPolicy) {
  152. r.lastPolicy = newPolicy
  153. }
  154. return r.lastPolicy, nil
  155. }
  156. // ReadingSession is like [Reader], but with a channel that's written
  157. // to when there's a policy change, and closed when the session is terminated.
  158. type ReadingSession struct {
  159. reader *Reader
  160. policyChangedCh chan struct{} // 1-buffered channel
  161. handle set.Handle // in the reader.sessions
  162. closeInternal func()
  163. }
  164. // OpenSession opens and returns a new session to r, allowing the caller
  165. // to get notified whenever a policy change is reported by the [source.Store],
  166. // or an [ErrStoreClosed] if the reader has already been closed.
  167. func (r *Reader) OpenSession() (*ReadingSession, error) {
  168. session := &ReadingSession{
  169. reader: r,
  170. policyChangedCh: make(chan struct{}, 1),
  171. }
  172. session.closeInternal = sync.OnceFunc(func() { close(session.policyChangedCh) })
  173. r.mu.Lock()
  174. defer r.mu.Unlock()
  175. if r.closing {
  176. return nil, ErrStoreClosed
  177. }
  178. session.handle = r.sessions.Add(session)
  179. return session, nil
  180. }
  181. // GetSettings is like [Reader.GetSettings].
  182. func (s *ReadingSession) GetSettings() *setting.Snapshot {
  183. return s.reader.GetSettings()
  184. }
  185. // ReadSettings is like [Reader.ReadSettings].
  186. func (s *ReadingSession) ReadSettings() (*setting.Snapshot, error) {
  187. return s.reader.ReadSettings()
  188. }
  189. // PolicyChanged returns a channel that's written to when
  190. // there's a policy change, closed when the session is terminated.
  191. func (s *ReadingSession) PolicyChanged() <-chan struct{} {
  192. return s.policyChangedCh
  193. }
  194. // Close unregisters this session with the [Reader].
  195. func (s *ReadingSession) Close() {
  196. s.reader.mu.Lock()
  197. delete(s.reader.sessions, s.handle)
  198. s.closeInternal()
  199. s.reader.mu.Unlock()
  200. }
  201. // onPolicyChange handles a policy change notification from the [Store],
  202. // invalidating the current [setting.Snapshot] in r,
  203. // and notifying the active [ReadingSession]s.
  204. func (r *Reader) onPolicyChange() {
  205. r.mu.Lock()
  206. defer r.mu.Unlock()
  207. r.upToDate = false
  208. for _, s := range r.sessions {
  209. select {
  210. case s.policyChangedCh <- struct{}{}:
  211. // Notified.
  212. default:
  213. // 1-buffered channel is full, meaning that another policy change
  214. // notification is already en route.
  215. }
  216. }
  217. }
  218. // Close closes the store reader and the underlying store.
  219. func (r *Reader) Close() error {
  220. r.mu.Lock()
  221. if r.closing {
  222. r.mu.Unlock()
  223. return nil
  224. }
  225. r.closing = true
  226. r.mu.Unlock()
  227. if r.unregisterChangeNotifier != nil {
  228. r.unregisterChangeNotifier()
  229. r.unregisterChangeNotifier = nil
  230. }
  231. if closer, ok := r.store.(io.Closer); ok {
  232. if err := closer.Close(); err != nil {
  233. return err
  234. }
  235. }
  236. r.store = nil
  237. close(r.doneCh)
  238. r.mu.Lock()
  239. defer r.mu.Unlock()
  240. for _, c := range r.sessions {
  241. c.closeInternal()
  242. }
  243. r.sessions = nil
  244. return nil
  245. }
  246. // Done returns a channel that is closed when the reader is closed.
  247. func (r *Reader) Done() <-chan struct{} {
  248. return r.doneCh
  249. }
  250. // ReadableSource is a [Source] open for reading.
  251. type ReadableSource struct {
  252. *Source
  253. *ReadingSession
  254. }
  255. // Close closes the underlying [ReadingSession].
  256. func (s ReadableSource) Close() {
  257. s.ReadingSession.Close()
  258. }
  259. // ReadableSources is a slice of [ReadableSource].
  260. type ReadableSources []ReadableSource
  261. // Contains reports whether s contains the specified source.
  262. func (s ReadableSources) Contains(source *Source) bool {
  263. return s.IndexOf(source) != -1
  264. }
  265. // IndexOf returns position of the specified source in s, or -1
  266. // if the source does not exist.
  267. func (s ReadableSources) IndexOf(source *Source) int {
  268. return slices.IndexFunc(s, func(rs ReadableSource) bool {
  269. return rs.Source == source
  270. })
  271. }
  272. // InsertionIndexOf returns the position at which source can be inserted
  273. // to maintain the sorted order of the readableSources.
  274. // The return value is unspecified if s is not sorted on entry to InsertionIndexOf.
  275. func (s ReadableSources) InsertionIndexOf(source *Source) int {
  276. // Insert new sources after any existing sources with the same precedence,
  277. // and just before the first source with higher precedence.
  278. // Just like stable sort, but for insertion.
  279. // It's okay to use linear search as insertions are rare
  280. // and we never have more than just a few policy sources.
  281. higherPrecedence := func(rs ReadableSource) bool { return rs.Compare(source) > 0 }
  282. if i := slices.IndexFunc(s, higherPrecedence); i != -1 {
  283. return i
  284. }
  285. return len(s)
  286. }
  287. // StableSort sorts [ReadableSource] in s by precedence, so that policy
  288. // settings from sources with higher precedence (e.g., [DeviceScope])
  289. // will be read and merged last, overriding any policy settings with
  290. // the same keys configured in sources with lower precedence
  291. // (e.g., [CurrentUserScope]).
  292. func (s *ReadableSources) StableSort() {
  293. sort.SliceStable(*s, func(i, j int) bool {
  294. return (*s)[i].Source.Compare((*s)[j].Source) < 0
  295. })
  296. }
  297. // DeleteAt closes and deletes the i-th source from s.
  298. func (s *ReadableSources) DeleteAt(i int) {
  299. (*s)[i].Close()
  300. *s = slices.Delete(*s, i, i+1)
  301. }
  302. // Close closes and deletes all sources in s.
  303. func (s *ReadableSources) Close() {
  304. for _, s := range *s {
  305. s.Close()
  306. }
  307. *s = nil
  308. }
  309. func readPolicySettingValue(store Store, s *setting.Definition) (value any, err error) {
  310. switch key := s.Key(); s.Type() {
  311. case setting.BooleanValue:
  312. return store.ReadBoolean(key)
  313. case setting.IntegerValue:
  314. return store.ReadUInt64(key)
  315. case setting.StringValue:
  316. return store.ReadString(key)
  317. case setting.StringListValue:
  318. return store.ReadStringArray(key)
  319. case setting.PreferenceOptionValue:
  320. s, err := store.ReadString(key)
  321. if err == nil {
  322. var value setting.PreferenceOption
  323. if err = value.UnmarshalText([]byte(s)); err == nil {
  324. return value, nil
  325. }
  326. }
  327. return setting.ShowChoiceByPolicy, err
  328. case setting.VisibilityValue:
  329. s, err := store.ReadString(key)
  330. if err == nil {
  331. var value setting.Visibility
  332. if err = value.UnmarshalText([]byte(s)); err == nil {
  333. return value, nil
  334. }
  335. }
  336. return setting.VisibleByPolicy, err
  337. case setting.DurationValue:
  338. s, err := store.ReadString(key)
  339. if err == nil {
  340. var value time.Duration
  341. if value, err = time.ParseDuration(s); err == nil {
  342. return value, nil
  343. }
  344. }
  345. return nil, err
  346. default:
  347. return nil, fmt.Errorf("%w: unsupported setting type: %v", setting.ErrTypeMismatch, s.Type())
  348. }
  349. }