watcher_fsevents.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. // Copyright (c) 2014-2015 The Notify Authors. All rights reserved.
  2. // Use of this source code is governed by the MIT license that can be
  3. // found in the LICENSE file.
  4. // +build darwin,!kqueue
  5. package notify
  6. import (
  7. "errors"
  8. "strings"
  9. "sync/atomic"
  10. )
  11. const (
  12. failure = uint32(FSEventsMustScanSubDirs | FSEventsUserDropped | FSEventsKernelDropped)
  13. filter = uint32(FSEventsCreated | FSEventsRemoved | FSEventsRenamed |
  14. FSEventsModified | FSEventsInodeMetaMod)
  15. )
  16. // FSEvent represents single file event. It is created out of values passed by
  17. // FSEvents to FSEventStreamCallback function.
  18. type FSEvent struct {
  19. Path string // real path of the file or directory
  20. ID uint64 // ID of the event (FSEventStreamEventId)
  21. Flags uint32 // joint FSEvents* flags (FSEventStreamEventFlags)
  22. }
  23. // splitflags separates event flags from single set into slice of flags.
  24. func splitflags(set uint32) (e []uint32) {
  25. for i := uint32(1); set != 0; i, set = i<<1, set>>1 {
  26. if (set & 1) != 0 {
  27. e = append(e, i)
  28. }
  29. }
  30. return
  31. }
  32. // watch represents a filesystem watchpoint. It is a higher level abstraction
  33. // over FSEvents' stream, which implements filtering of file events based
  34. // on path and event set. It emulates non-recursive watch-point by filtering out
  35. // events which paths are more than 1 level deeper than the watched path.
  36. type watch struct {
  37. // prev stores last event set per path in order to filter out old flags
  38. // for new events, which appratenly FSEvents likes to retain. It's a disgusting
  39. // hack, it should be researched how to get rid of it.
  40. prev map[string]uint32
  41. c chan<- EventInfo
  42. stream *stream
  43. path string
  44. events uint32
  45. isrec int32
  46. flushed bool
  47. }
  48. // Example format:
  49. //
  50. // ~ $ (trigger command) # (event set) -> (effective event set)
  51. //
  52. // Heuristics:
  53. //
  54. // 1. Create event is removed when it was present in previous event set.
  55. // Example:
  56. //
  57. // ~ $ echo > file # Create|Write -> Create|Write
  58. // ~ $ echo > file # Create|Write|InodeMetaMod -> Write|InodeMetaMod
  59. //
  60. // 2. Remove event is removed if it was present in previouse event set.
  61. // Example:
  62. //
  63. // ~ $ touch file # Create -> Create
  64. // ~ $ rm file # Create|Remove -> Remove
  65. // ~ $ touch file # Create|Remove -> Create
  66. //
  67. // 3. Write event is removed if not followed by InodeMetaMod on existing
  68. // file. Example:
  69. //
  70. // ~ $ echo > file # Create|Write -> Create|Write
  71. // ~ $ chmod +x file # Create|Write|ChangeOwner -> ChangeOwner
  72. //
  73. // 4. Write&InodeMetaMod is removed when effective event set contain Remove event.
  74. // Example:
  75. //
  76. // ~ $ echo > file # Write|InodeMetaMod -> Write|InodeMetaMod
  77. // ~ $ rm file # Remove|Write|InodeMetaMod -> Remove
  78. //
  79. func (w *watch) strip(base string, set uint32) uint32 {
  80. const (
  81. write = FSEventsModified | FSEventsInodeMetaMod
  82. both = FSEventsCreated | FSEventsRemoved
  83. )
  84. switch w.prev[base] {
  85. case FSEventsCreated:
  86. set &^= FSEventsCreated
  87. if set&FSEventsRemoved != 0 {
  88. w.prev[base] = FSEventsRemoved
  89. set &^= write
  90. }
  91. case FSEventsRemoved:
  92. set &^= FSEventsRemoved
  93. if set&FSEventsCreated != 0 {
  94. w.prev[base] = FSEventsCreated
  95. }
  96. default:
  97. switch set & both {
  98. case FSEventsCreated:
  99. w.prev[base] = FSEventsCreated
  100. case FSEventsRemoved:
  101. w.prev[base] = FSEventsRemoved
  102. set &^= write
  103. }
  104. }
  105. dbgprintf("split()=%v\n", Event(set))
  106. return set
  107. }
  108. // Dispatch is a stream function which forwards given file events for the watched
  109. // path to underlying FileInfo channel.
  110. func (w *watch) Dispatch(ev []FSEvent) {
  111. events := atomic.LoadUint32(&w.events)
  112. isrec := (atomic.LoadInt32(&w.isrec) == 1)
  113. for i := range ev {
  114. if ev[i].Flags&FSEventsHistoryDone != 0 {
  115. w.flushed = true
  116. continue
  117. }
  118. if !w.flushed {
  119. continue
  120. }
  121. dbgprintf("%v (0x%x) (%s, i=%d, ID=%d, len=%d)\n", Event(ev[i].Flags),
  122. ev[i].Flags, ev[i].Path, i, ev[i].ID, len(ev))
  123. if ev[i].Flags&failure != 0 {
  124. // TODO(rjeczalik): missing error handling
  125. continue
  126. }
  127. if !strings.HasPrefix(ev[i].Path, w.path) {
  128. continue
  129. }
  130. n := len(w.path)
  131. base := ""
  132. if len(ev[i].Path) > n {
  133. if ev[i].Path[n] != '/' {
  134. continue
  135. }
  136. base = ev[i].Path[n+1:]
  137. if !isrec && strings.IndexByte(base, '/') != -1 {
  138. continue
  139. }
  140. }
  141. // TODO(rjeczalik): get diff only from filtered events?
  142. e := w.strip(string(base), ev[i].Flags) & events
  143. if e == 0 {
  144. continue
  145. }
  146. for _, e := range splitflags(e) {
  147. dbgprintf("%d: single event: %v", ev[i].ID, Event(e))
  148. w.c <- &event{
  149. fse: ev[i],
  150. event: Event(e),
  151. }
  152. }
  153. }
  154. }
  155. // Stop closes underlying FSEvents stream and stops dispatching events.
  156. func (w *watch) Stop() {
  157. w.stream.Stop()
  158. // TODO(rjeczalik): make (*stream).Stop flush synchronously undelivered events,
  159. // so the following hack can be removed. It should flush all the streams
  160. // concurrently as we care not to block too much here.
  161. atomic.StoreUint32(&w.events, 0)
  162. atomic.StoreInt32(&w.isrec, 0)
  163. }
  164. // fsevents implements Watcher and RecursiveWatcher interfaces backed by FSEvents
  165. // framework.
  166. type fsevents struct {
  167. watches map[string]*watch
  168. c chan<- EventInfo
  169. }
  170. func newWatcher(c chan<- EventInfo) watcher {
  171. return &fsevents{
  172. watches: make(map[string]*watch),
  173. c: c,
  174. }
  175. }
  176. func (fse *fsevents) watch(path string, event Event, isrec int32) (err error) {
  177. if _, ok := fse.watches[path]; ok {
  178. return errAlreadyWatched
  179. }
  180. w := &watch{
  181. prev: make(map[string]uint32),
  182. c: fse.c,
  183. path: path,
  184. events: uint32(event),
  185. isrec: isrec,
  186. }
  187. w.stream = newStream(path, w.Dispatch)
  188. if err = w.stream.Start(); err != nil {
  189. return err
  190. }
  191. fse.watches[path] = w
  192. return nil
  193. }
  194. func (fse *fsevents) unwatch(path string) (err error) {
  195. w, ok := fse.watches[path]
  196. if !ok {
  197. return errNotWatched
  198. }
  199. w.stream.Stop()
  200. delete(fse.watches, path)
  201. return nil
  202. }
  203. // Watch implements Watcher interface. It fails with non-nil error when setting
  204. // the watch-point by FSEvents fails or with errAlreadyWatched error when
  205. // the given path is already watched.
  206. func (fse *fsevents) Watch(path string, event Event) error {
  207. return fse.watch(path, event, 0)
  208. }
  209. // Unwatch implements Watcher interface. It fails with errNotWatched when
  210. // the given path is not being watched.
  211. func (fse *fsevents) Unwatch(path string) error {
  212. return fse.unwatch(path)
  213. }
  214. // Rewatch implements Watcher interface. It fails with errNotWatched when
  215. // the given path is not being watched or with errInvalidEventSet when oldevent
  216. // does not match event set the watch-point currently holds.
  217. func (fse *fsevents) Rewatch(path string, oldevent, newevent Event) error {
  218. w, ok := fse.watches[path]
  219. if !ok {
  220. return errNotWatched
  221. }
  222. if !atomic.CompareAndSwapUint32(&w.events, uint32(oldevent), uint32(newevent)) {
  223. return errInvalidEventSet
  224. }
  225. atomic.StoreInt32(&w.isrec, 0)
  226. return nil
  227. }
  228. // RecursiveWatch implements RecursiveWatcher interface. It fails with non-nil
  229. // error when setting the watch-point by FSEvents fails or with errAlreadyWatched
  230. // error when the given path is already watched.
  231. func (fse *fsevents) RecursiveWatch(path string, event Event) error {
  232. return fse.watch(path, event, 1)
  233. }
  234. // RecursiveUnwatch implements RecursiveWatcher interface. It fails with
  235. // errNotWatched when the given path is not being watched.
  236. //
  237. // TODO(rjeczalik): fail if w.isrec == 0?
  238. func (fse *fsevents) RecursiveUnwatch(path string) error {
  239. return fse.unwatch(path)
  240. }
  241. // RecrusiveRewatch implements RecursiveWatcher interface. It fails:
  242. //
  243. // * with errNotWatched when the given path is not being watched
  244. // * with errInvalidEventSet when oldevent does not match the current event set
  245. // * with errAlreadyWatched when watch-point given by the oldpath was meant to
  246. // be relocated to newpath, but the newpath is already watched
  247. // * a non-nil error when setting the watch-point with FSEvents fails
  248. //
  249. // TODO(rjeczalik): Improve handling of watch-point relocation? See two TODOs
  250. // that follows.
  251. func (fse *fsevents) RecursiveRewatch(oldpath, newpath string, oldevent, newevent Event) error {
  252. switch [2]bool{oldpath == newpath, oldevent == newevent} {
  253. case [2]bool{true, true}:
  254. w, ok := fse.watches[oldpath]
  255. if !ok {
  256. return errNotWatched
  257. }
  258. atomic.StoreInt32(&w.isrec, 1)
  259. return nil
  260. case [2]bool{true, false}:
  261. w, ok := fse.watches[oldpath]
  262. if !ok {
  263. return errNotWatched
  264. }
  265. if !atomic.CompareAndSwapUint32(&w.events, uint32(oldevent), uint32(newevent)) {
  266. return errors.New("invalid event state diff")
  267. }
  268. atomic.StoreInt32(&w.isrec, 1)
  269. return nil
  270. default:
  271. // TODO(rjeczalik): rewatch newpath only if exists?
  272. // TODO(rjeczalik): migrate w.prev to new watch?
  273. if _, ok := fse.watches[newpath]; ok {
  274. return errAlreadyWatched
  275. }
  276. if err := fse.Unwatch(oldpath); err != nil {
  277. return err
  278. }
  279. // TODO(rjeczalik): revert unwatch if watch fails?
  280. return fse.watch(newpath, newevent, 1)
  281. }
  282. }
  283. // Close unwatches all watch-points.
  284. func (fse *fsevents) Close() error {
  285. for _, w := range fse.watches {
  286. w.Stop()
  287. }
  288. fse.watches = nil
  289. return nil
  290. }