watcher_trigger.go 11 KB


  1. // Copyright (c) 2014-2015 The Notify Authors. All rights reserved.
  2. // Use of this source code is governed by the MIT license that can be
  3. // found in the LICENSE file.
  4. // +build darwin,kqueue dragonfly freebsd netbsd openbsd solaris
  5. // watcher_trigger is used for FEN and kqueue which behave similarly:
  6. // only files and dirs can be watched directly, but not files inside dirs.
  7. // As a result Create events have to be generated by implementation when
  8. // after Write event is returned for watched dir, it is rescanned and Create
  9. // event is returned for new files and these are automatically added
  10. // to watchlist. In case of removal of watched directory, native system returns
  11. // events for all files, but for Rename, they also need to be generated.
  12. // As a result native system works as something like trigger for rescan,
  13. // but contains additional data about dir in which changes occurred. For files
  14. // detailed data is returned.
  15. // Usage of watcher_trigger requires:
  16. // - trigger implementation,
  17. // - encode func,
  18. // - not2nat, nat2not maps.
  19. // Required manual operations on filesystem can lead to loss of precision.
  20. package notify
  21. import (
  22. "fmt"
  23. "os"
  24. "path/filepath"
  25. "strings"
  26. "sync"
  27. "syscall"
  28. )
  29. // trigger is to be implemented by platform implementation like FEN or kqueue.
  30. type trigger interface {
  31. // Close closes watcher's main native file descriptor.
  32. Close() error
  33. // Stop waiting for new events.
  34. Stop() error
  35. // Create new instance of watched.
  36. NewWatched(string, os.FileInfo) (*watched, error)
  37. // Record internally new *watched instance.
  38. Record(*watched)
  39. // Del removes internal copy of *watched instance.
  40. Del(*watched)
  41. // Watched returns *watched instance and native events for native type.
  42. Watched(interface{}) (*watched, int64, error)
  43. // Init initializes native watcher call.
  44. Init() error
  45. // Watch starts watching provided file/dir.
  46. Watch(os.FileInfo, *watched, int64) error
  47. // Unwatch stops watching provided file/dir.
  48. Unwatch(*watched) error
  49. // Wait for new events.
  50. Wait() (interface{}, error)
  51. // IsStop checks if Wait finished because of request watcher's stop.
  52. IsStop(n interface{}, err error) bool
  53. }
  54. // trgWatched is a the base data structure representing watched file/directory.
  55. // The platform specific full data structure (watched) must embed this type.
  56. type trgWatched struct {
  57. // p is a path to watched file/directory.
  58. p string
  59. // fi provides information about watched file/dir.
  60. fi os.FileInfo
  61. // eDir represents events watched directly.
  62. eDir Event
  63. // eNonDir represents events watched indirectly.
  64. eNonDir Event
  65. }
  66. // encode Event to native representation. Implementation is to be provided by
  67. // platform specific implementation.
  68. var encode func(Event, bool) int64
  69. var (
  70. // nat2not matches native events to notify's ones. To be initialized by
  71. // platform dependent implementation.
  72. nat2not map[Event]Event
  73. // not2nat matches notify's events to native ones. To be initialized by
  74. // platform dependent implementation.
  75. not2nat map[Event]Event
  76. )
  77. // trg is a main structure implementing watcher.
  78. type trg struct {
  79. sync.Mutex
  80. // s is a channel used to stop monitoring.
  81. s chan struct{}
  82. // c is a channel used to pass events further.
  83. c chan<- EventInfo
  84. // pthLkp is a data structure mapping file names with data about watching
  85. // represented by them files/directories.
  86. pthLkp map[string]*watched
  87. // t is a platform dependent implementation of trigger.
  88. t trigger
  89. }
  90. // newWatcher returns new watcher's implementation.
  91. func newWatcher(c chan<- EventInfo) watcher {
  92. t := &trg{
  93. s: make(chan struct{}, 1),
  94. pthLkp: make(map[string]*watched, 0),
  95. c: c,
  96. }
  97. t.t = newTrigger(t.pthLkp)
  98. if err := t.t.Init(); err != nil {
  99. panic(err)
  100. }
  101. go t.monitor()
  102. return t
  103. }
  104. // Close implements watcher.
  105. func (t *trg) Close() (err error) {
  106. t.Lock()
  107. if err = t.t.Stop(); err != nil {
  108. t.Unlock()
  109. return
  110. }
  111. <-t.s
  112. var e error
  113. for _, w := range t.pthLkp {
  114. if e = t.unwatch(w.p, w.fi); e != nil {
  115. dbgprintf("trg: unwatch %q failed: %q\n", w.p, e)
  116. err = nonil(err, e)
  117. }
  118. }
  119. if e = t.t.Close(); e != nil {
  120. dbgprintf("trg: closing native watch failed: %q\n", e)
  121. err = nonil(err, e)
  122. }
  123. if remaining := len(t.pthLkp); remaining != 0 {
  124. err = nonil(err, fmt.Errorf("Not all watches were removed: len(t.pthLkp) == %v", len(t.pthLkp)))
  125. }
  126. t.Unlock()
  127. return
  128. }
  129. // send reported events one by one through chan.
  130. func (t *trg) send(evn []event) {
  131. for i := range evn {
  132. t.c <- &evn[i]
  133. }
  134. }
  135. // singlewatch starts to watch given p file/directory.
  136. func (t *trg) singlewatch(p string, e Event, direct mode, fi os.FileInfo) (err error) {
  137. w, ok := t.pthLkp[p]
  138. if !ok {
  139. if w, err = t.t.NewWatched(p, fi); err != nil {
  140. return
  141. }
  142. }
  143. switch direct {
  144. case dir:
  145. w.eDir |= e
  146. case ndir:
  147. w.eNonDir |= e
  148. case both:
  149. w.eDir |= e
  150. w.eNonDir |= e
  151. }
  152. if err = t.t.Watch(fi, w, encode(w.eDir|w.eNonDir, fi.IsDir())); err != nil {
  153. return
  154. }
  155. if !ok {
  156. t.t.Record(w)
  157. return nil
  158. }
  159. return errAlreadyWatched
  160. }
  161. // decode converts event received from native to notify.Event
  162. // representation taking into account requested events (w).
  163. func decode(o int64, w Event) (e Event) {
  164. for f, n := range nat2not {
  165. if o&int64(f) != 0 {
  166. if w&f != 0 {
  167. e |= f
  168. }
  169. if w&n != 0 {
  170. e |= n
  171. }
  172. }
  173. }
  174. return
  175. }
  176. func (t *trg) watch(p string, e Event, fi os.FileInfo) error {
  177. if err := t.singlewatch(p, e, dir, fi); err != nil {
  178. if err != errAlreadyWatched {
  179. return err
  180. }
  181. }
  182. if fi.IsDir() {
  183. err := t.walk(p, func(fi os.FileInfo) (err error) {
  184. if err = t.singlewatch(filepath.Join(p, fi.Name()), e, ndir,
  185. fi); err != nil {
  186. if err != errAlreadyWatched {
  187. return
  188. }
  189. }
  190. return nil
  191. })
  192. if err != nil {
  193. return err
  194. }
  195. }
  196. return nil
  197. }
  198. // walk runs f func on each file/dir from p directory.
  199. func (t *trg) walk(p string, fn func(os.FileInfo) error) error {
  200. fp, err := os.Open(p)
  201. if err != nil {
  202. return err
  203. }
  204. ls, err := fp.Readdir(0)
  205. fp.Close()
  206. if err != nil {
  207. return err
  208. }
  209. for i := range ls {
  210. if err := fn(ls[i]); err != nil {
  211. return err
  212. }
  213. }
  214. return nil
  215. }
  216. func (t *trg) unwatch(p string, fi os.FileInfo) error {
  217. if fi.IsDir() {
  218. err := t.walk(p, func(fi os.FileInfo) error {
  219. err := t.singleunwatch(filepath.Join(p, fi.Name()), ndir)
  220. if err != errNotWatched {
  221. return err
  222. }
  223. return nil
  224. })
  225. if err != nil {
  226. return err
  227. }
  228. }
  229. return t.singleunwatch(p, dir)
  230. }
  231. // Watch implements Watcher interface.
  232. func (t *trg) Watch(p string, e Event) error {
  233. fi, err := os.Stat(p)
  234. if err != nil {
  235. return err
  236. }
  237. t.Lock()
  238. err = t.watch(p, e, fi)
  239. t.Unlock()
  240. return err
  241. }
  242. // Unwatch implements Watcher interface.
  243. func (t *trg) Unwatch(p string) error {
  244. fi, err := os.Stat(p)
  245. if err != nil {
  246. return err
  247. }
  248. t.Lock()
  249. err = t.unwatch(p, fi)
  250. t.Unlock()
  251. return err
  252. }
  253. // Rewatch implements Watcher interface.
  254. //
  255. // TODO(rjeczalik): This is a naive hack. Rewrite might help.
  256. func (t *trg) Rewatch(p string, _, e Event) error {
  257. fi, err := os.Stat(p)
  258. if err != nil {
  259. return err
  260. }
  261. t.Lock()
  262. if err = t.unwatch(p, fi); err == nil {
  263. // TODO(rjeczalik): If watch fails then we leave trigger in inconsistent
  264. // state. Handle? Panic? Native version of rewatch?
  265. err = t.watch(p, e, fi)
  266. }
  267. t.Unlock()
  268. return nil
  269. }
  270. func (*trg) file(w *watched, n interface{}, e Event) (evn []event) {
  271. evn = append(evn, event{w.p, e, w.fi.IsDir(), n})
  272. return
  273. }
  274. func (t *trg) dir(w *watched, n interface{}, e, ge Event) (evn []event) {
  275. // If it's dir and delete we have to send it and continue, because
  276. // other processing relies on opening (in this case not existing) dir.
  277. // Events for contents of this dir are reported by native impl.
  278. // However events for rename must be generated for all monitored files
  279. // inside of moved directory, because native impl does not report it independently
  280. // for each file descriptor being moved in result of move action on
  281. // parent directory.
  282. if (ge & (not2nat[Rename] | not2nat[Remove])) != 0 {
  283. // Write is reported also for Remove on directory. Because of that
  284. // we have to filter it out explicitly.
  285. evn = append(evn, event{w.p, e & ^Write & ^not2nat[Write], true, n})
  286. if ge&not2nat[Rename] != 0 {
  287. for p := range t.pthLkp {
  288. if strings.HasPrefix(p, w.p+string(os.PathSeparator)) {
  289. if err := t.singleunwatch(p, both); err != nil && err != errNotWatched &&
  290. !os.IsNotExist(err) {
  291. dbgprintf("trg: failed stop watching moved file (%q): %q\n",
  292. p, err)
  293. }
  294. if (w.eDir|w.eNonDir)&(not2nat[Rename]|Rename) != 0 {
  295. evn = append(evn, event{
  296. p, (w.eDir | w.eNonDir) & e &^ Write &^ not2nat[Write],
  297. w.fi.IsDir(), nil,
  298. })
  299. }
  300. }
  301. }
  302. }
  303. t.t.Del(w)
  304. return
  305. }
  306. if (ge & not2nat[Write]) != 0 {
  307. switch err := t.walk(w.p, func(fi os.FileInfo) error {
  308. p := filepath.Join(w.p, fi.Name())
  309. switch err := t.singlewatch(p, w.eDir, ndir, fi); {
  310. case os.IsNotExist(err) && ((w.eDir & Remove) != 0):
  311. evn = append(evn, event{p, Remove, fi.IsDir(), n})
  312. case err == errAlreadyWatched:
  313. case err != nil:
  314. dbgprintf("trg: watching %q failed: %q", p, err)
  315. case (w.eDir & Create) != 0:
  316. evn = append(evn, event{p, Create, fi.IsDir(), n})
  317. default:
  318. }
  319. return nil
  320. }); {
  321. case os.IsNotExist(err):
  322. return
  323. case err != nil:
  324. dbgprintf("trg: dir processing failed: %q", err)
  325. default:
  326. }
  327. }
  328. return
  329. }
  330. type mode uint
  331. const (
  332. dir mode = iota
  333. ndir
  334. both
  335. )
  336. // unwatch stops watching p file/directory.
  337. func (t *trg) singleunwatch(p string, direct mode) error {
  338. w, ok := t.pthLkp[p]
  339. if !ok {
  340. return errNotWatched
  341. }
  342. switch direct {
  343. case dir:
  344. w.eDir = 0
  345. case ndir:
  346. w.eNonDir = 0
  347. case both:
  348. w.eDir, w.eNonDir = 0, 0
  349. }
  350. if err := t.t.Unwatch(w); err != nil {
  351. return err
  352. }
  353. if w.eNonDir|w.eDir != 0 {
  354. mod := dir
  355. if w.eNonDir != 0 {
  356. mod = ndir
  357. }
  358. if err := t.singlewatch(p, w.eNonDir|w.eDir, mod,
  359. w.fi); err != nil && err != errAlreadyWatched {
  360. return err
  361. }
  362. } else {
  363. t.t.Del(w)
  364. }
  365. return nil
  366. }
  367. func (t *trg) monitor() {
  368. var (
  369. n interface{}
  370. err error
  371. )
  372. for {
  373. switch n, err = t.t.Wait(); {
  374. case err == syscall.EINTR:
  375. case t.t.IsStop(n, err):
  376. t.s <- struct{}{}
  377. return
  378. case err != nil:
  379. dbgprintf("trg: failed to read events: %q\n", err)
  380. default:
  381. t.send(t.process(n))
  382. }
  383. }
  384. }
  385. // process event returned by native call.
  386. func (t *trg) process(n interface{}) (evn []event) {
  387. t.Lock()
  388. w, ge, err := t.t.Watched(n)
  389. if err != nil {
  390. t.Unlock()
  391. dbgprintf("trg: %v event lookup failed: %q", Event(ge), err)
  392. return
  393. }
  394. e := decode(ge, w.eDir|w.eNonDir)
  395. if ge&int64(not2nat[Remove]|not2nat[Rename]) == 0 {
  396. switch fi, err := os.Stat(w.p); {
  397. case err != nil:
  398. default:
  399. if err = t.t.Watch(fi, w, encode(w.eDir|w.eNonDir, fi.IsDir())); err != nil {
  400. dbgprintf("trg: %q is no longer watched: %q", w.p, err)
  401. t.t.Del(w)
  402. }
  403. }
  404. }
  405. if e == Event(0) && (!w.fi.IsDir() || (ge&int64(not2nat[Write])) == 0) {
  406. t.Unlock()
  407. return
  408. }
  409. if w.fi.IsDir() {
  410. evn = append(evn, t.dir(w, n, e, Event(ge))...)
  411. } else {
  412. evn = append(evn, t.file(w, n, e)...)
  413. }
  414. if Event(ge)&(not2nat[Remove]|not2nat[Rename]) != 0 {
  415. t.t.Del(w)
  416. }
  417. t.Unlock()
  418. return
  419. }