tree_recursive.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. // Copyright (c) 2014-2015 The Notify Authors. All rights reserved.
  2. // Use of this source code is governed by the MIT license that can be
  3. // found in the LICENSE file.
  4. package notify
  5. import "sync"
  6. // watchAdd TODO(rjeczalik)
  7. func watchAdd(nd node, c chan<- EventInfo, e Event) eventDiff {
  8. diff := nd.Watch.Add(c, e)
  9. if wp := nd.Child[""].Watch; len(wp) != 0 {
  10. e = wp.Total()
  11. diff[0] |= e
  12. diff[1] |= e
  13. if diff[0] == diff[1] {
  14. return none
  15. }
  16. }
  17. return diff
  18. }
  19. // watchAddInactive TODO(rjeczalik)
  20. func watchAddInactive(nd node, c chan<- EventInfo, e Event) eventDiff {
  21. wp := nd.Child[""].Watch
  22. if wp == nil {
  23. wp = make(watchpoint)
  24. nd.Child[""] = node{Watch: wp}
  25. }
  26. diff := wp.Add(c, e)
  27. e = nd.Watch.Total()
  28. diff[0] |= e
  29. diff[1] |= e
  30. if diff[0] == diff[1] {
  31. return none
  32. }
  33. return diff
  34. }
  35. // watchCopy TODO(rjeczalik)
  36. func watchCopy(src, dst node) {
  37. for c, e := range src.Watch {
  38. if c == nil {
  39. continue
  40. }
  41. watchAddInactive(dst, c, e)
  42. }
  43. if wpsrc := src.Child[""].Watch; len(wpsrc) != 0 {
  44. wpdst := dst.Child[""].Watch
  45. for c, e := range wpsrc {
  46. if c == nil {
  47. continue
  48. }
  49. wpdst.Add(c, e)
  50. }
  51. }
  52. }
  53. // watchDel TODO(rjeczalik)
  54. func watchDel(nd node, c chan<- EventInfo, e Event) eventDiff {
  55. diff := nd.Watch.Del(c, e)
  56. if wp := nd.Child[""].Watch; len(wp) != 0 {
  57. diffInactive := wp.Del(c, e)
  58. e = wp.Total()
  59. // TODO(rjeczalik): add e if e != all?
  60. diff[0] |= diffInactive[0] | e
  61. diff[1] |= diffInactive[1] | e
  62. if diff[0] == diff[1] {
  63. return none
  64. }
  65. }
  66. return diff
  67. }
  68. // watchTotal TODO(rjeczalik)
  69. func watchTotal(nd node) Event {
  70. e := nd.Watch.Total()
  71. if wp := nd.Child[""].Watch; len(wp) != 0 {
  72. e |= wp.Total()
  73. }
  74. return e
  75. }
  76. // watchIsRecursive TODO(rjeczalik)
  77. func watchIsRecursive(nd node) bool {
  78. ok := nd.Watch.IsRecursive()
  79. // TODO(rjeczalik): add a test for len(wp) != 0 change the condition.
  80. if wp := nd.Child[""].Watch; len(wp) != 0 {
  81. // If a watchpoint holds inactive watchpoints, it means it's a parent
  82. // one, which is recursive by nature even though it may be not recursive
  83. // itself.
  84. ok = true
  85. }
  86. return ok
  87. }
  88. // recursiveTree TODO(rjeczalik)
  89. type recursiveTree struct {
  90. rw sync.RWMutex // protects root
  91. root root
  92. // TODO(rjeczalik): merge watcher + recursiveWatcher after #5 and #6
  93. w interface {
  94. watcher
  95. recursiveWatcher
  96. }
  97. c chan EventInfo
  98. }
  99. // newRecursiveTree TODO(rjeczalik)
  100. func newRecursiveTree(w recursiveWatcher, c chan EventInfo) *recursiveTree {
  101. t := &recursiveTree{
  102. root: root{nd: newnode("")},
  103. w: struct {
  104. watcher
  105. recursiveWatcher
  106. }{w.(watcher), w},
  107. c: c,
  108. }
  109. go t.dispatch()
  110. return t
  111. }
  112. // dispatch TODO(rjeczalik)
  113. func (t *recursiveTree) dispatch() {
  114. for ei := range t.c {
  115. dbgprintf("dispatching %v on %q", ei.Event(), ei.Path())
  116. go func(ei EventInfo) {
  117. nd, ok := node{}, false
  118. dir, base := split(ei.Path())
  119. fn := func(it node, isbase bool) error {
  120. if isbase {
  121. nd = it
  122. } else {
  123. it.Watch.Dispatch(ei, recursive)
  124. }
  125. return nil
  126. }
  127. t.rw.RLock()
  128. defer t.rw.RUnlock()
  129. // Notify recursive watchpoints found on the path.
  130. if err := t.root.WalkPath(dir, fn); err != nil {
  131. dbgprint("dispatch did not reach leaf:", err)
  132. return
  133. }
  134. // Notify parent watchpoint.
  135. nd.Watch.Dispatch(ei, 0)
  136. // If leaf watchpoint exists, notify it.
  137. if nd, ok = nd.Child[base]; ok {
  138. nd.Watch.Dispatch(ei, 0)
  139. }
  140. }(ei)
  141. }
  142. }
  143. // Watch TODO(rjeczalik)
  144. func (t *recursiveTree) Watch(path string, c chan<- EventInfo,
  145. doNotWatch func(string) bool, events ...Event) error {
  146. if c == nil {
  147. panic("notify: Watch using nil channel")
  148. }
  149. // Expanding with empty event set is a nop.
  150. if len(events) == 0 {
  151. return nil
  152. }
  153. path, isrec, err := cleanpath(path)
  154. if err != nil {
  155. return err
  156. }
  157. eventset := joinevents(events)
  158. if isrec {
  159. eventset |= recursive
  160. }
  161. t.rw.Lock()
  162. defer t.rw.Unlock()
  163. // case 1: cur is a child
  164. //
  165. // Look for parent watch which already covers the given path.
  166. parent := node{}
  167. self := false
  168. err = t.root.WalkPath(path, func(nd node, isbase bool) error {
  169. if watchTotal(nd) != 0 {
  170. parent = nd
  171. self = isbase
  172. return errSkip
  173. }
  174. return nil
  175. })
  176. cur := t.root.Add(path) // add after the walk, so it's less to traverse
  177. if err == nil && parent.Watch != nil {
  178. // Parent watch found. Register inactive watchpoint, so we have enough
  179. // information to shrink the eventset on eventual Stop.
  180. // return t.resetwatchpoint(parent, parent, c, eventset|inactive)
  181. var diff eventDiff
  182. if self {
  183. diff = watchAdd(cur, c, eventset)
  184. } else {
  185. diff = watchAddInactive(parent, c, eventset)
  186. }
  187. switch {
  188. case diff == none:
  189. // the parent watchpoint already covers requested subtree with its
  190. // eventset
  191. case diff[0] == 0:
  192. // TODO(rjeczalik): cleanup this panic after implementation is stable
  193. panic("dangling watchpoint: " + parent.Name)
  194. default:
  195. if isrec || watchIsRecursive(parent) {
  196. err = t.w.RecursiveRewatch(parent.Name, parent.Name, diff[0], diff[1])
  197. } else {
  198. err = t.w.Rewatch(parent.Name, diff[0], diff[1])
  199. }
  200. if err != nil {
  201. watchDel(parent, c, diff.Event())
  202. return err
  203. }
  204. watchAdd(cur, c, eventset)
  205. // TODO(rjeczalik): account top-most path for c
  206. return nil
  207. }
  208. if !self {
  209. watchAdd(cur, c, eventset)
  210. }
  211. return nil
  212. }
  213. // case 2: cur is new parent
  214. //
  215. // Look for children nodes, unwatch n-1 of them and rewatch the last one.
  216. var children []node
  217. fn := func(nd node) error {
  218. if len(nd.Watch) == 0 {
  219. return nil
  220. }
  221. children = append(children, nd)
  222. return errSkip
  223. }
  224. switch must(cur.Walk(fn)); len(children) {
  225. case 0:
  226. // no child watches, cur holds a new watch
  227. case 1:
  228. watchAdd(cur, c, eventset) // TODO(rjeczalik): update cache c subtree root?
  229. watchCopy(children[0], cur)
  230. err = t.w.RecursiveRewatch(children[0].Name, cur.Name, watchTotal(children[0]),
  231. watchTotal(cur))
  232. if err != nil {
  233. // Clean inactive watchpoint. The c chan did not exist before.
  234. cur.Child[""] = node{}
  235. delete(cur.Watch, c)
  236. return err
  237. }
  238. return nil
  239. default:
  240. watchAdd(cur, c, eventset)
  241. // Copy children inactive watchpoints to the new parent.
  242. for _, nd := range children {
  243. watchCopy(nd, cur)
  244. }
  245. // Watch parent subtree.
  246. if err = t.w.RecursiveWatch(cur.Name, watchTotal(cur)); err != nil {
  247. // Clean inactive watchpoint. The c chan did not exist before.
  248. cur.Child[""] = node{}
  249. delete(cur.Watch, c)
  250. return err
  251. }
  252. // Unwatch children subtrees.
  253. var e error
  254. for _, nd := range children {
  255. if watchIsRecursive(nd) {
  256. e = t.w.RecursiveUnwatch(nd.Name)
  257. } else {
  258. e = t.w.Unwatch(nd.Name)
  259. }
  260. if e != nil {
  261. err = nonil(err, e)
  262. // TODO(rjeczalik): child is still watched, warn all its watchpoints
  263. // about possible duplicate events via Error event
  264. }
  265. }
  266. return err
  267. }
  268. // case 3: cur is new, alone node
  269. switch diff := watchAdd(cur, c, eventset); {
  270. case diff == none:
  271. // TODO(rjeczalik): cleanup this panic after implementation is stable
  272. panic("watch requested but no parent watchpoint found: " + cur.Name)
  273. case diff[0] == 0:
  274. if isrec {
  275. err = t.w.RecursiveWatch(cur.Name, diff[1])
  276. } else {
  277. err = t.w.Watch(cur.Name, diff[1])
  278. }
  279. if err != nil {
  280. watchDel(cur, c, diff.Event())
  281. return err
  282. }
  283. default:
  284. // TODO(rjeczalik): cleanup this panic after implementation is stable
  285. panic("watch requested but no parent watchpoint found: " + cur.Name)
  286. }
  287. return nil
  288. }
  289. // Stop TODO(rjeczalik)
  290. //
  291. // TODO(rjeczalik): Split parent watchpoint - transfer watches to children
  292. // if parent is no longer needed. This carries a risk that underlying
  293. // watcher calls could fail - reconsider if it's worth the effort.
  294. func (t *recursiveTree) Stop(c chan<- EventInfo) {
  295. var err error
  296. fn := func(nd node) (e error) {
  297. diff := watchDel(nd, c, all)
  298. switch {
  299. case diff == none && watchTotal(nd) == 0:
  300. // TODO(rjeczalik): There's no watchpoints deeper in the tree,
  301. // probably we should remove the nodes as well.
  302. return nil
  303. case diff == none:
  304. // Removing c from nd does not require shrinking its eventset.
  305. case diff[1] == 0:
  306. if watchIsRecursive(nd) {
  307. e = t.w.RecursiveUnwatch(nd.Name)
  308. } else {
  309. e = t.w.Unwatch(nd.Name)
  310. }
  311. default:
  312. if watchIsRecursive(nd) {
  313. e = t.w.RecursiveRewatch(nd.Name, nd.Name, diff[0], diff[1])
  314. } else {
  315. e = t.w.Rewatch(nd.Name, diff[0], diff[1])
  316. }
  317. }
  318. fn := func(nd node) error {
  319. watchDel(nd, c, all)
  320. return nil
  321. }
  322. err = nonil(err, e, nd.Walk(fn))
  323. // TODO(rjeczalik): if e != nil store dummy chan in nd.Watch just to
  324. // retry un/rewatching next time and/or let the user handle the failure
  325. // vie Error event?
  326. return errSkip
  327. }
  328. t.rw.Lock()
  329. e := t.root.Walk("", fn) // TODO(rjeczalik): use max root per c
  330. t.rw.Unlock()
  331. if e != nil {
  332. err = nonil(err, e)
  333. }
  334. dbgprintf("Stop(%p) error: %v\n", c, err)
  335. }
  336. // Close TODO(rjeczalik)
  337. func (t *recursiveTree) Close() error {
  338. err := t.w.Close()
  339. close(t.c)
  340. return err
  341. }