123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- // Copyright (c) 2014-2015 The Notify Authors. All rights reserved.
- // Use of this source code is governed by the MIT license that can be
- // found in the LICENSE file.
- package notify
- import "sync"
- // watchAdd TODO(rjeczalik)
- func watchAdd(nd node, c chan<- EventInfo, e Event) eventDiff {
- diff := nd.Watch.Add(c, e)
- if wp := nd.Child[""].Watch; len(wp) != 0 {
- e = wp.Total()
- diff[0] |= e
- diff[1] |= e
- if diff[0] == diff[1] {
- return none
- }
- }
- return diff
- }
- // watchAddInactive TODO(rjeczalik)
- func watchAddInactive(nd node, c chan<- EventInfo, e Event) eventDiff {
- wp := nd.Child[""].Watch
- if wp == nil {
- wp = make(watchpoint)
- nd.Child[""] = node{Watch: wp}
- }
- diff := wp.Add(c, e)
- e = nd.Watch.Total()
- diff[0] |= e
- diff[1] |= e
- if diff[0] == diff[1] {
- return none
- }
- return diff
- }
- // watchCopy TODO(rjeczalik)
- func watchCopy(src, dst node) {
- for c, e := range src.Watch {
- if c == nil {
- continue
- }
- watchAddInactive(dst, c, e)
- }
- if wpsrc := src.Child[""].Watch; len(wpsrc) != 0 {
- wpdst := dst.Child[""].Watch
- for c, e := range wpsrc {
- if c == nil {
- continue
- }
- wpdst.Add(c, e)
- }
- }
- }
- // watchDel TODO(rjeczalik)
- func watchDel(nd node, c chan<- EventInfo, e Event) eventDiff {
- diff := nd.Watch.Del(c, e)
- if wp := nd.Child[""].Watch; len(wp) != 0 {
- diffInactive := wp.Del(c, e)
- e = wp.Total()
- // TODO(rjeczalik): add e if e != all?
- diff[0] |= diffInactive[0] | e
- diff[1] |= diffInactive[1] | e
- if diff[0] == diff[1] {
- return none
- }
- }
- return diff
- }
- // watchTotal TODO(rjeczalik)
- func watchTotal(nd node) Event {
- e := nd.Watch.Total()
- if wp := nd.Child[""].Watch; len(wp) != 0 {
- e |= wp.Total()
- }
- return e
- }
- // watchIsRecursive TODO(rjeczalik)
- func watchIsRecursive(nd node) bool {
- ok := nd.Watch.IsRecursive()
- // TODO(rjeczalik): add a test for len(wp) != 0 change the condition.
- if wp := nd.Child[""].Watch; len(wp) != 0 {
- // If a watchpoint holds inactive watchpoints, it means it's a parent
- // one, which is recursive by nature even though it may be not recursive
- // itself.
- ok = true
- }
- return ok
- }
- // recursiveTree TODO(rjeczalik)
- type recursiveTree struct {
- rw sync.RWMutex // protects root
- root root
- // TODO(rjeczalik): merge watcher + recursiveWatcher after #5 and #6
- w interface {
- watcher
- recursiveWatcher
- }
- c chan EventInfo
- }
- // newRecursiveTree TODO(rjeczalik)
- func newRecursiveTree(w recursiveWatcher, c chan EventInfo) *recursiveTree {
- t := &recursiveTree{
- root: root{nd: newnode("")},
- w: struct {
- watcher
- recursiveWatcher
- }{w.(watcher), w},
- c: c,
- }
- go t.dispatch()
- return t
- }
- // dispatch TODO(rjeczalik)
- func (t *recursiveTree) dispatch() {
- for ei := range t.c {
- dbgprintf("dispatching %v on %q", ei.Event(), ei.Path())
- go func(ei EventInfo) {
- nd, ok := node{}, false
- dir, base := split(ei.Path())
- fn := func(it node, isbase bool) error {
- if isbase {
- nd = it
- } else {
- it.Watch.Dispatch(ei, recursive)
- }
- return nil
- }
- t.rw.RLock()
- defer t.rw.RUnlock()
- // Notify recursive watchpoints found on the path.
- if err := t.root.WalkPath(dir, fn); err != nil {
- dbgprint("dispatch did not reach leaf:", err)
- return
- }
- // Notify parent watchpoint.
- nd.Watch.Dispatch(ei, 0)
- // If leaf watchpoint exists, notify it.
- if nd, ok = nd.Child[base]; ok {
- nd.Watch.Dispatch(ei, 0)
- }
- }(ei)
- }
- }
- // Watch TODO(rjeczalik)
- func (t *recursiveTree) Watch(path string, c chan<- EventInfo,
- doNotWatch func(string) bool, events ...Event) error {
- if c == nil {
- panic("notify: Watch using nil channel")
- }
- // Expanding with empty event set is a nop.
- if len(events) == 0 {
- return nil
- }
- path, isrec, err := cleanpath(path)
- if err != nil {
- return err
- }
- eventset := joinevents(events)
- if isrec {
- eventset |= recursive
- }
- t.rw.Lock()
- defer t.rw.Unlock()
- // case 1: cur is a child
- //
- // Look for parent watch which already covers the given path.
- parent := node{}
- self := false
- err = t.root.WalkPath(path, func(nd node, isbase bool) error {
- if watchTotal(nd) != 0 {
- parent = nd
- self = isbase
- return errSkip
- }
- return nil
- })
- cur := t.root.Add(path) // add after the walk, so it's less to traverse
- if err == nil && parent.Watch != nil {
- // Parent watch found. Register inactive watchpoint, so we have enough
- // information to shrink the eventset on eventual Stop.
- // return t.resetwatchpoint(parent, parent, c, eventset|inactive)
- var diff eventDiff
- if self {
- diff = watchAdd(cur, c, eventset)
- } else {
- diff = watchAddInactive(parent, c, eventset)
- }
- switch {
- case diff == none:
- // the parent watchpoint already covers requested subtree with its
- // eventset
- case diff[0] == 0:
- // TODO(rjeczalik): cleanup this panic after implementation is stable
- panic("dangling watchpoint: " + parent.Name)
- default:
- if isrec || watchIsRecursive(parent) {
- err = t.w.RecursiveRewatch(parent.Name, parent.Name, diff[0], diff[1])
- } else {
- err = t.w.Rewatch(parent.Name, diff[0], diff[1])
- }
- if err != nil {
- watchDel(parent, c, diff.Event())
- return err
- }
- watchAdd(cur, c, eventset)
- // TODO(rjeczalik): account top-most path for c
- return nil
- }
- if !self {
- watchAdd(cur, c, eventset)
- }
- return nil
- }
- // case 2: cur is new parent
- //
- // Look for children nodes, unwatch n-1 of them and rewatch the last one.
- var children []node
- fn := func(nd node) error {
- if len(nd.Watch) == 0 {
- return nil
- }
- children = append(children, nd)
- return errSkip
- }
- switch must(cur.Walk(fn)); len(children) {
- case 0:
- // no child watches, cur holds a new watch
- case 1:
- watchAdd(cur, c, eventset) // TODO(rjeczalik): update cache c subtree root?
- watchCopy(children[0], cur)
- err = t.w.RecursiveRewatch(children[0].Name, cur.Name, watchTotal(children[0]),
- watchTotal(cur))
- if err != nil {
- // Clean inactive watchpoint. The c chan did not exist before.
- cur.Child[""] = node{}
- delete(cur.Watch, c)
- return err
- }
- return nil
- default:
- watchAdd(cur, c, eventset)
- // Copy children inactive watchpoints to the new parent.
- for _, nd := range children {
- watchCopy(nd, cur)
- }
- // Watch parent subtree.
- if err = t.w.RecursiveWatch(cur.Name, watchTotal(cur)); err != nil {
- // Clean inactive watchpoint. The c chan did not exist before.
- cur.Child[""] = node{}
- delete(cur.Watch, c)
- return err
- }
- // Unwatch children subtrees.
- var e error
- for _, nd := range children {
- if watchIsRecursive(nd) {
- e = t.w.RecursiveUnwatch(nd.Name)
- } else {
- e = t.w.Unwatch(nd.Name)
- }
- if e != nil {
- err = nonil(err, e)
- // TODO(rjeczalik): child is still watched, warn all its watchpoints
- // about possible duplicate events via Error event
- }
- }
- return err
- }
- // case 3: cur is new, alone node
- switch diff := watchAdd(cur, c, eventset); {
- case diff == none:
- // TODO(rjeczalik): cleanup this panic after implementation is stable
- panic("watch requested but no parent watchpoint found: " + cur.Name)
- case diff[0] == 0:
- if isrec {
- err = t.w.RecursiveWatch(cur.Name, diff[1])
- } else {
- err = t.w.Watch(cur.Name, diff[1])
- }
- if err != nil {
- watchDel(cur, c, diff.Event())
- return err
- }
- default:
- // TODO(rjeczalik): cleanup this panic after implementation is stable
- panic("watch requested but no parent watchpoint found: " + cur.Name)
- }
- return nil
- }
- // Stop TODO(rjeczalik)
- //
- // TODO(rjeczalik): Split parent watchpoint - transfer watches to children
- // if parent is no longer needed. This carries a risk that underlying
- // watcher calls could fail - reconsider if it's worth the effort.
- func (t *recursiveTree) Stop(c chan<- EventInfo) {
- var err error
- fn := func(nd node) (e error) {
- diff := watchDel(nd, c, all)
- switch {
- case diff == none && watchTotal(nd) == 0:
- // TODO(rjeczalik): There's no watchpoints deeper in the tree,
- // probably we should remove the nodes as well.
- return nil
- case diff == none:
- // Removing c from nd does not require shrinking its eventset.
- case diff[1] == 0:
- if watchIsRecursive(nd) {
- e = t.w.RecursiveUnwatch(nd.Name)
- } else {
- e = t.w.Unwatch(nd.Name)
- }
- default:
- if watchIsRecursive(nd) {
- e = t.w.RecursiveRewatch(nd.Name, nd.Name, diff[0], diff[1])
- } else {
- e = t.w.Rewatch(nd.Name, diff[0], diff[1])
- }
- }
- fn := func(nd node) error {
- watchDel(nd, c, all)
- return nil
- }
- err = nonil(err, e, nd.Walk(fn))
- // TODO(rjeczalik): if e != nil store dummy chan in nd.Watch just to
- // retry un/rewatching next time and/or let the user handle the failure
- // vie Error event?
- return errSkip
- }
- t.rw.Lock()
- e := t.root.Walk("", fn) // TODO(rjeczalik): use max root per c
- t.rw.Unlock()
- if e != nil {
- err = nonil(err, e)
- }
- dbgprintf("Stop(%p) error: %v\n", c, err)
- }
- // Close TODO(rjeczalik)
- func (t *recursiveTree) Close() error {
- err := t.w.Close()
- close(t.c)
- return err
- }
|