watcher_readdcw.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. // Copyright (c) 2014-2015 The Notify Authors. All rights reserved.
  2. // Use of this source code is governed by the MIT license that can be
  3. // found in the LICENSE file.
  4. // +build windows
  5. package notify
  6. import (
  7. "errors"
  8. "runtime"
  9. "sync"
  10. "sync/atomic"
  11. "syscall"
  12. "unsafe"
  13. )
  14. // readBufferSize defines the size of an array in which read statuses are stored.
  15. // The buffer have to be DWORD-aligned and, if notify is used in monitoring a
  16. // directory over the network, its size must not be greater than 64KB. Each of
  17. // watched directories uses its own buffer for storing events.
  18. const readBufferSize = 4096
  19. // Since all operations which go through the Windows completion routine are done
  20. // asynchronously, filter may set one of the constants belor. They were defined
  21. // in order to distinguish whether current folder should be re-registered in
  22. // ReadDirectoryChangesW function or some control operations need to be executed.
  23. const (
  24. stateRewatch uint32 = 1 << (28 + iota)
  25. stateUnwatch
  26. stateCPClose
  27. )
  28. // Filter used in current implementation was split into four segments:
  29. // - bits 0-11 store ReadDirectoryChangesW filters,
  30. // - bits 12-19 store File notify actions,
  31. // - bits 20-27 store notify specific events and flags,
  32. // - bits 28-31 store states which are used in loop's FSM.
  33. // Constants below are used as masks to retrieve only specific filter parts.
  34. const (
  35. onlyNotifyChanges uint32 = 0x00000FFF
  36. onlyNGlobalEvents uint32 = 0x0FF00000
  37. onlyMachineStates uint32 = 0xF0000000
  38. )
  39. // grip represents a single watched directory. It stores the data required by
  40. // ReadDirectoryChangesW function. Only the filter, recursive, and handle members
  41. // may by modified by watcher implementation. Rest of the them have to remain
  42. // constant since they are used by Windows completion routine. This indicates that
  43. // grip can be removed only when all operations on the file handle are finished.
  44. type grip struct {
  45. handle syscall.Handle
  46. filter uint32
  47. recursive bool
  48. pathw []uint16
  49. buffer [readBufferSize]byte
  50. parent *watched
  51. ovlapped *overlappedEx
  52. }
  53. // overlappedEx stores information used in asynchronous input and output.
  54. // Additionally, overlappedEx contains a pointer to 'grip' item which is used in
  55. // order to gather the structure in which the overlappedEx object was created.
  56. type overlappedEx struct {
  57. syscall.Overlapped
  58. parent *grip
  59. }
  60. // newGrip creates a new file handle that can be used in overlapped operations.
  61. // Then, the handle is associated with I/O completion port 'cph' and its value
  62. // is stored in newly created 'grip' object.
  63. func newGrip(cph syscall.Handle, parent *watched, filter uint32) (*grip, error) {
  64. g := &grip{
  65. handle: syscall.InvalidHandle,
  66. filter: filter,
  67. recursive: parent.recursive,
  68. pathw: parent.pathw,
  69. parent: parent,
  70. ovlapped: &overlappedEx{},
  71. }
  72. if err := g.register(cph); err != nil {
  73. return nil, err
  74. }
  75. g.ovlapped.parent = g
  76. return g, nil
  77. }
  78. // NOTE : Thread safe
  79. func (g *grip) register(cph syscall.Handle) (err error) {
  80. if g.handle, err = syscall.CreateFile(
  81. &g.pathw[0],
  82. syscall.FILE_LIST_DIRECTORY,
  83. syscall.FILE_SHARE_READ|syscall.FILE_SHARE_WRITE|syscall.FILE_SHARE_DELETE,
  84. nil,
  85. syscall.OPEN_EXISTING,
  86. syscall.FILE_FLAG_BACKUP_SEMANTICS|syscall.FILE_FLAG_OVERLAPPED,
  87. 0,
  88. ); err != nil {
  89. return
  90. }
  91. if _, err = syscall.CreateIoCompletionPort(g.handle, cph, 0, 0); err != nil {
  92. syscall.CloseHandle(g.handle)
  93. return
  94. }
  95. return g.readDirChanges()
  96. }
  97. // readDirChanges tells the system to store file change information in grip's
  98. // buffer. Directory changes that occur between calls to this function are added
  99. // to the buffer and then, returned with the next call.
  100. func (g *grip) readDirChanges() error {
  101. return syscall.ReadDirectoryChanges(
  102. g.handle,
  103. &g.buffer[0],
  104. uint32(unsafe.Sizeof(g.buffer)),
  105. g.recursive,
  106. encode(g.filter),
  107. nil,
  108. (*syscall.Overlapped)(unsafe.Pointer(g.ovlapped)),
  109. 0,
  110. )
  111. }
  112. // encode transforms a generic filter, which contains platform independent and
  113. // implementation specific bit fields, to value that can be used as NotifyFilter
  114. // parameter in ReadDirectoryChangesW function.
  115. func encode(filter uint32) uint32 {
  116. e := Event(filter & (onlyNGlobalEvents | onlyNotifyChanges))
  117. if e&dirmarker != 0 {
  118. return uint32(FileNotifyChangeDirName)
  119. }
  120. if e&Create != 0 {
  121. e = (e ^ Create) | FileNotifyChangeFileName
  122. }
  123. if e&Remove != 0 {
  124. e = (e ^ Remove) | FileNotifyChangeFileName
  125. }
  126. if e&Write != 0 {
  127. e = (e ^ Write) | FileNotifyChangeAttributes | FileNotifyChangeSize |
  128. FileNotifyChangeCreation | FileNotifyChangeSecurity
  129. }
  130. if e&Rename != 0 {
  131. e = (e ^ Rename) | FileNotifyChangeFileName
  132. }
  133. return uint32(e)
  134. }
  135. // watched is made in order to check whether an action comes from a directory or
  136. // file. This approach requires two file handlers per single monitored folder. The
  137. // second grip handles actions which include creating or deleting a directory. If
  138. // these processes are not monitored, only the first grip is created.
  139. type watched struct {
  140. filter uint32
  141. recursive bool
  142. count uint8
  143. pathw []uint16
  144. digrip [2]*grip
  145. }
  146. // newWatched creates a new watched instance. It splits the filter variable into
  147. // two parts. The first part is responsible for watching all events which can be
  148. // created for a file in watched directory structure and the second one watches
  149. // only directory Create/Remove actions. If all operations succeed, the Create
  150. // message is sent to I/O completion port queue for further processing.
  151. func newWatched(cph syscall.Handle, filter uint32, recursive bool,
  152. path string) (wd *watched, err error) {
  153. wd = &watched{
  154. filter: filter,
  155. recursive: recursive,
  156. }
  157. if wd.pathw, err = syscall.UTF16FromString(path); err != nil {
  158. return
  159. }
  160. if err = wd.recreate(cph); err != nil {
  161. return
  162. }
  163. return wd, nil
  164. }
  165. // TODO : doc
  166. func (wd *watched) recreate(cph syscall.Handle) (err error) {
  167. filefilter := wd.filter &^ uint32(FileNotifyChangeDirName)
  168. if err = wd.updateGrip(0, cph, filefilter == 0, filefilter); err != nil {
  169. return
  170. }
  171. dirfilter := wd.filter & uint32(FileNotifyChangeDirName|Create|Remove)
  172. if err = wd.updateGrip(1, cph, dirfilter == 0, wd.filter|uint32(dirmarker)); err != nil {
  173. return
  174. }
  175. wd.filter &^= onlyMachineStates
  176. return
  177. }
  178. // TODO : doc
  179. func (wd *watched) updateGrip(idx int, cph syscall.Handle, reset bool,
  180. newflag uint32) (err error) {
  181. if reset {
  182. wd.digrip[idx] = nil
  183. } else {
  184. if wd.digrip[idx] == nil {
  185. if wd.digrip[idx], err = newGrip(cph, wd, newflag); err != nil {
  186. wd.closeHandle()
  187. return
  188. }
  189. } else {
  190. wd.digrip[idx].filter = newflag
  191. wd.digrip[idx].recursive = wd.recursive
  192. if err = wd.digrip[idx].register(cph); err != nil {
  193. wd.closeHandle()
  194. return
  195. }
  196. }
  197. wd.count++
  198. }
  199. return
  200. }
  201. // closeHandle closes handles that are stored in digrip array. Function always
  202. // tries to close all of the handlers before it exits, even when there are errors
  203. // returned from the operating system kernel.
  204. func (wd *watched) closeHandle() (err error) {
  205. for _, g := range wd.digrip {
  206. if g != nil && g.handle != syscall.InvalidHandle {
  207. switch suberr := syscall.CloseHandle(g.handle); {
  208. case suberr == nil:
  209. g.handle = syscall.InvalidHandle
  210. case err == nil:
  211. err = suberr
  212. }
  213. }
  214. }
  215. return
  216. }
  217. // watcher implements Watcher interface. It stores a set of watched directories.
  218. // All operations which remove watched objects from map `m` must be performed in
  219. // loop goroutine since these structures are used internally by operating system.
  220. type readdcw struct {
  221. sync.Mutex
  222. m map[string]*watched
  223. cph syscall.Handle
  224. start bool
  225. wg sync.WaitGroup
  226. c chan<- EventInfo
  227. }
  228. // NewWatcher creates new non-recursive watcher backed by ReadDirectoryChangesW.
  229. func newWatcher(c chan<- EventInfo) watcher {
  230. r := &readdcw{
  231. m: make(map[string]*watched),
  232. cph: syscall.InvalidHandle,
  233. c: c,
  234. }
  235. runtime.SetFinalizer(r, func(r *readdcw) {
  236. if r.cph != syscall.InvalidHandle {
  237. syscall.CloseHandle(r.cph)
  238. }
  239. })
  240. return r
  241. }
  242. // Watch implements notify.Watcher interface.
  243. func (r *readdcw) Watch(path string, event Event) error {
  244. return r.watch(path, event, false)
  245. }
  246. // RecursiveWatch implements notify.RecursiveWatcher interface.
  247. func (r *readdcw) RecursiveWatch(path string, event Event) error {
  248. return r.watch(path, event, true)
  249. }
  250. // watch inserts a directory to the group of watched folders. If watched folder
  251. // already exists, function tries to rewatch it with new filters(NOT VALID). Moreover,
  252. // watch starts the main event loop goroutine when called for the first time.
  253. func (r *readdcw) watch(path string, event Event, recursive bool) (err error) {
  254. if event&^(All|fileNotifyChangeAll) != 0 {
  255. return errors.New("notify: unknown event")
  256. }
  257. r.Lock()
  258. wd, ok := r.m[path]
  259. r.Unlock()
  260. if !ok {
  261. if err = r.lazyinit(); err != nil {
  262. return
  263. }
  264. r.Lock()
  265. defer r.Unlock()
  266. if wd, ok = r.m[path]; ok {
  267. dbgprint("watch: exists already")
  268. return
  269. }
  270. if wd, err = newWatched(r.cph, uint32(event), recursive, path); err != nil {
  271. return
  272. }
  273. r.m[path] = wd
  274. dbgprint("watch: new watch added")
  275. } else {
  276. dbgprint("watch: exists already")
  277. }
  278. return nil
  279. }
  280. // lazyinit creates an I/O completion port and starts the main event processing
  281. // loop. This method uses Double-Checked Locking optimization.
  282. func (r *readdcw) lazyinit() (err error) {
  283. invalid := uintptr(syscall.InvalidHandle)
  284. if atomic.LoadUintptr((*uintptr)(&r.cph)) == invalid {
  285. r.Lock()
  286. defer r.Unlock()
  287. if atomic.LoadUintptr((*uintptr)(&r.cph)) == invalid {
  288. cph := syscall.InvalidHandle
  289. if cph, err = syscall.CreateIoCompletionPort(cph, 0, 0, 0); err != nil {
  290. return
  291. }
  292. r.cph, r.start = cph, true
  293. go r.loop()
  294. }
  295. }
  296. return
  297. }
  298. // TODO(pknap) : doc
  299. func (r *readdcw) loop() {
  300. var n, key uint32
  301. var overlapped *syscall.Overlapped
  302. for {
  303. err := syscall.GetQueuedCompletionStatus(r.cph, &n, &key, &overlapped, syscall.INFINITE)
  304. if key == stateCPClose {
  305. r.Lock()
  306. handle := r.cph
  307. r.cph = syscall.InvalidHandle
  308. r.Unlock()
  309. syscall.CloseHandle(handle)
  310. r.wg.Done()
  311. return
  312. }
  313. if overlapped == nil {
  314. // TODO: check key == rewatch delete or 0(panic)
  315. continue
  316. }
  317. overEx := (*overlappedEx)(unsafe.Pointer(overlapped))
  318. if n != 0 {
  319. r.loopevent(n, overEx)
  320. if err = overEx.parent.readDirChanges(); err != nil {
  321. // TODO: error handling
  322. }
  323. }
  324. r.loopstate(overEx)
  325. }
  326. }
  327. // TODO(pknap) : doc
  328. func (r *readdcw) loopstate(overEx *overlappedEx) {
  329. r.Lock()
  330. defer r.Unlock()
  331. filter := overEx.parent.parent.filter
  332. if filter&onlyMachineStates == 0 {
  333. return
  334. }
  335. if overEx.parent.parent.count--; overEx.parent.parent.count == 0 {
  336. switch filter & onlyMachineStates {
  337. case stateRewatch:
  338. dbgprint("loopstate rewatch")
  339. overEx.parent.parent.recreate(r.cph)
  340. case stateUnwatch:
  341. dbgprint("loopstate unwatch")
  342. delete(r.m, syscall.UTF16ToString(overEx.parent.pathw))
  343. case stateCPClose:
  344. default:
  345. panic(`notify: windows loopstate logic error`)
  346. }
  347. }
  348. }
  349. // TODO(pknap) : doc
  350. func (r *readdcw) loopevent(n uint32, overEx *overlappedEx) {
  351. events := []*event{}
  352. var currOffset uint32
  353. for {
  354. raw := (*syscall.FileNotifyInformation)(unsafe.Pointer(&overEx.parent.buffer[currOffset]))
  355. name := syscall.UTF16ToString((*[syscall.MAX_LONG_PATH]uint16)(unsafe.Pointer(&raw.FileName))[:raw.FileNameLength>>1])
  356. events = append(events, &event{
  357. pathw: overEx.parent.pathw,
  358. filter: overEx.parent.filter,
  359. action: raw.Action,
  360. name: name,
  361. })
  362. if raw.NextEntryOffset == 0 {
  363. break
  364. }
  365. if currOffset += raw.NextEntryOffset; currOffset >= n {
  366. break
  367. }
  368. }
  369. r.send(events)
  370. }
  371. // TODO(pknap) : doc
  372. func (r *readdcw) send(es []*event) {
  373. for _, e := range es {
  374. var syse Event
  375. if e.e, syse = decode(e.filter, e.action); e.e == 0 && syse == 0 {
  376. continue
  377. }
  378. switch {
  379. case e.action == syscall.FILE_ACTION_MODIFIED:
  380. e.ftype = fTypeUnknown
  381. case e.filter&uint32(dirmarker) != 0:
  382. e.ftype = fTypeDirectory
  383. default:
  384. e.ftype = fTypeFile
  385. }
  386. switch {
  387. case e.e == 0:
  388. e.e = syse
  389. case syse != 0:
  390. r.c <- &event{
  391. pathw: e.pathw,
  392. name: e.name,
  393. ftype: e.ftype,
  394. action: e.action,
  395. filter: e.filter,
  396. e: syse,
  397. }
  398. }
  399. r.c <- e
  400. }
  401. }
  402. // Rewatch implements notify.Rewatcher interface.
  403. func (r *readdcw) Rewatch(path string, oldevent, newevent Event) error {
  404. return r.rewatch(path, uint32(oldevent), uint32(newevent), false)
  405. }
  406. // RecursiveRewatch implements notify.RecursiveRewatcher interface.
  407. func (r *readdcw) RecursiveRewatch(oldpath, newpath string, oldevent,
  408. newevent Event) error {
  409. if oldpath != newpath {
  410. if err := r.unwatch(oldpath); err != nil {
  411. return err
  412. }
  413. return r.watch(newpath, newevent, true)
  414. }
  415. return r.rewatch(newpath, uint32(oldevent), uint32(newevent), true)
  416. }
  417. // TODO : (pknap) doc.
  418. func (r *readdcw) rewatch(path string, oldevent, newevent uint32, recursive bool) (err error) {
  419. if Event(newevent)&^(All|fileNotifyChangeAll) != 0 {
  420. return errors.New("notify: unknown event")
  421. }
  422. var wd *watched
  423. r.Lock()
  424. defer r.Unlock()
  425. if wd, err = r.nonStateWatchedLocked(path); err != nil {
  426. return
  427. }
  428. if wd.filter&(onlyNotifyChanges|onlyNGlobalEvents) != oldevent {
  429. panic(`notify: windows re-watcher logic error`)
  430. }
  431. wd.filter = stateRewatch | newevent
  432. wd.recursive, recursive = recursive, wd.recursive
  433. if err = wd.closeHandle(); err != nil {
  434. wd.filter = oldevent
  435. wd.recursive = recursive
  436. return
  437. }
  438. return
  439. }
  440. // TODO : pknap
  441. func (r *readdcw) nonStateWatchedLocked(path string) (wd *watched, err error) {
  442. wd, ok := r.m[path]
  443. if !ok || wd == nil {
  444. err = errors.New(`notify: ` + path + ` path is unwatched`)
  445. return
  446. }
  447. if wd.filter&onlyMachineStates != 0 {
  448. err = errors.New(`notify: another re/unwatching operation in progress`)
  449. return
  450. }
  451. return
  452. }
  453. // Unwatch implements notify.Watcher interface.
  454. func (r *readdcw) Unwatch(path string) error {
  455. return r.unwatch(path)
  456. }
  457. // RecursiveUnwatch implements notify.RecursiveWatcher interface.
  458. func (r *readdcw) RecursiveUnwatch(path string) error {
  459. return r.unwatch(path)
  460. }
  461. // TODO : pknap
  462. func (r *readdcw) unwatch(path string) (err error) {
  463. var wd *watched
  464. r.Lock()
  465. defer r.Unlock()
  466. if wd, err = r.nonStateWatchedLocked(path); err != nil {
  467. return
  468. }
  469. wd.filter |= stateUnwatch
  470. if err = wd.closeHandle(); err != nil {
  471. wd.filter &^= stateUnwatch
  472. return
  473. }
  474. if _, attrErr := syscall.GetFileAttributes(&wd.pathw[0]); attrErr != nil {
  475. for _, g := range wd.digrip {
  476. if g != nil {
  477. dbgprint("unwatch: posting")
  478. if err = syscall.PostQueuedCompletionStatus(r.cph, 0, 0, (*syscall.Overlapped)(unsafe.Pointer(g.ovlapped))); err != nil {
  479. wd.filter &^= stateUnwatch
  480. return
  481. }
  482. }
  483. }
  484. }
  485. return
  486. }
  487. // Close resets the whole watcher object, closes all existing file descriptors,
  488. // and sends stateCPClose state as completion key to the main watcher's loop.
  489. func (r *readdcw) Close() (err error) {
  490. r.Lock()
  491. if !r.start {
  492. r.Unlock()
  493. return nil
  494. }
  495. for _, wd := range r.m {
  496. wd.filter &^= onlyMachineStates
  497. wd.filter |= stateCPClose
  498. if e := wd.closeHandle(); e != nil && err == nil {
  499. err = e
  500. }
  501. }
  502. r.start = false
  503. r.Unlock()
  504. r.wg.Add(1)
  505. if e := syscall.PostQueuedCompletionStatus(r.cph, 0, stateCPClose, nil); e != nil && err == nil {
  506. return e
  507. }
  508. r.wg.Wait()
  509. return
  510. }
  511. // decode creates a notify event from both non-raw filter and action which was
  512. // returned from completion routine. Function may return Event(0) in case when
  513. // filter was replaced by a new value which does not contain fields that are
  514. // valid with passed action.
  515. func decode(filter, action uint32) (Event, Event) {
  516. switch action {
  517. case syscall.FILE_ACTION_ADDED:
  518. return gensys(filter, Create, FileActionAdded)
  519. case syscall.FILE_ACTION_REMOVED:
  520. return gensys(filter, Remove, FileActionRemoved)
  521. case syscall.FILE_ACTION_MODIFIED:
  522. return gensys(filter, Write, FileActionModified)
  523. case syscall.FILE_ACTION_RENAMED_OLD_NAME:
  524. return gensys(filter, Rename, FileActionRenamedOldName)
  525. case syscall.FILE_ACTION_RENAMED_NEW_NAME:
  526. return gensys(filter, Rename, FileActionRenamedNewName)
  527. }
  528. panic(`notify: cannot decode internal mask`)
  529. }
  530. // gensys decides whether the Windows action, system-independent event or both
  531. // of them should be returned. Since the grip's filter may be atomically changed
  532. // during watcher lifetime, it is possible that neither Windows nor notify masks
  533. // are watched by the user when this function is called.
  534. func gensys(filter uint32, ge, se Event) (gene, syse Event) {
  535. isdir := filter&uint32(dirmarker) != 0
  536. if isdir && filter&uint32(FileNotifyChangeDirName) != 0 ||
  537. !isdir && filter&uint32(FileNotifyChangeFileName) != 0 ||
  538. filter&uint32(fileNotifyChangeModified) != 0 {
  539. syse = se
  540. }
  541. if filter&uint32(ge) != 0 {
  542. gene = ge
  543. }
  544. return
  545. }