1
0

eventloop.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. package eventloop
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. "github.com/dop251/goja"
  7. )
  8. type job struct {
  9. cancel func() bool
  10. fn func()
  11. idx int
  12. cancelled bool
  13. }
  14. type Timer struct {
  15. job
  16. timer *time.Timer
  17. }
  18. type Interval struct {
  19. job
  20. ticker *time.Ticker
  21. stopChan chan struct{}
  22. }
  23. type Immediate struct {
  24. job
  25. }
  26. type EventLoop struct {
  27. vm *goja.Runtime
  28. jobChan chan func()
  29. jobs []*job
  30. jobCount int32
  31. canRun int32
  32. auxJobsLock sync.Mutex
  33. wakeupChan chan struct{}
  34. auxJobsSpare, auxJobs []func()
  35. stopLock sync.Mutex
  36. stopCond *sync.Cond
  37. running bool
  38. terminated bool
  39. errorHandler func(error)
  40. }
  41. func Enable(runtime *goja.Runtime, errorHandler func(error)) *EventLoop {
  42. loop := &EventLoop{
  43. vm: runtime,
  44. jobChan: make(chan func()),
  45. wakeupChan: make(chan struct{}, 1),
  46. errorHandler: errorHandler,
  47. }
  48. loop.stopCond = sync.NewCond(&loop.stopLock)
  49. runtime.Set("setTimeout", loop.setTimeout)
  50. runtime.Set("setInterval", loop.setInterval)
  51. runtime.Set("setImmediate", loop.setImmediate)
  52. runtime.Set("clearTimeout", loop.clearTimeout)
  53. runtime.Set("clearInterval", loop.clearInterval)
  54. runtime.Set("clearImmediate", loop.clearImmediate)
  55. return loop
  56. }
  57. func (loop *EventLoop) schedule(call goja.FunctionCall, repeating bool) goja.Value {
  58. if fn, ok := goja.AssertFunction(call.Argument(0)); ok {
  59. delay := call.Argument(1).ToInteger()
  60. var args []goja.Value
  61. if len(call.Arguments) > 2 {
  62. args = append(args, call.Arguments[2:]...)
  63. }
  64. f := func() {
  65. _, err := fn(nil, args...)
  66. if err != nil {
  67. loop.errorHandler(err)
  68. }
  69. }
  70. loop.jobCount++
  71. var job *job
  72. var ret goja.Value
  73. if repeating {
  74. interval := loop.newInterval(f)
  75. interval.start(loop, time.Duration(delay)*time.Millisecond)
  76. job = &interval.job
  77. ret = loop.vm.ToValue(interval)
  78. } else {
  79. timeout := loop.newTimeout(f)
  80. timeout.start(loop, time.Duration(delay)*time.Millisecond)
  81. job = &timeout.job
  82. ret = loop.vm.ToValue(timeout)
  83. }
  84. job.idx = len(loop.jobs)
  85. loop.jobs = append(loop.jobs, job)
  86. return ret
  87. }
  88. return nil
  89. }
  90. func (loop *EventLoop) setTimeout(call goja.FunctionCall) goja.Value {
  91. return loop.schedule(call, false)
  92. }
  93. func (loop *EventLoop) setInterval(call goja.FunctionCall) goja.Value {
  94. return loop.schedule(call, true)
  95. }
  96. func (loop *EventLoop) setImmediate(call goja.FunctionCall) goja.Value {
  97. if fn, ok := goja.AssertFunction(call.Argument(0)); ok {
  98. var args []goja.Value
  99. if len(call.Arguments) > 1 {
  100. args = append(args, call.Arguments[1:]...)
  101. }
  102. f := func() {
  103. _, err := fn(nil, args...)
  104. if err != nil {
  105. loop.errorHandler(err)
  106. }
  107. }
  108. loop.jobCount++
  109. return loop.vm.ToValue(loop.addImmediate(f))
  110. }
  111. return nil
  112. }
  113. // SetTimeout schedules to run the specified function in the context
  114. // of the loop as soon as possible after the specified timeout period.
  115. // SetTimeout returns a Timer which can be passed to ClearTimeout.
  116. // The instance of goja.Runtime that is passed to the function and any Values derived
  117. // from it must not be used outside the function. SetTimeout is
  118. // safe to call inside or outside the loop.
  119. // If the loop is terminated (see Terminate()) returns nil.
  120. func (loop *EventLoop) SetTimeout(fn func(*goja.Runtime), timeout time.Duration) *Timer {
  121. t := loop.newTimeout(func() { fn(loop.vm) })
  122. if loop.addAuxJob(func() {
  123. t.start(loop, timeout)
  124. loop.jobCount++
  125. t.idx = len(loop.jobs)
  126. loop.jobs = append(loop.jobs, &t.job)
  127. }) {
  128. return t
  129. }
  130. return nil
  131. }
  132. // ClearTimeout cancels a Timer returned by SetTimeout if it has not run yet.
  133. // ClearTimeout is safe to call inside or outside the loop.
  134. func (loop *EventLoop) ClearTimeout(t *Timer) {
  135. loop.addAuxJob(func() {
  136. loop.clearTimeout(t)
  137. })
  138. }
  139. // SetInterval schedules to repeatedly run the specified function in
  140. // the context of the loop as soon as possible after every specified
  141. // timeout period. SetInterval returns an Interval which can be
  142. // passed to ClearInterval. The instance of goja.Runtime that is passed to the
  143. // function and any Values derived from it must not be used outside
  144. // the function. SetInterval is safe to call inside or outside the
  145. // loop.
  146. // If the loop is terminated (see Terminate()) returns nil.
  147. func (loop *EventLoop) SetInterval(fn func(*goja.Runtime), timeout time.Duration) *Interval {
  148. i := loop.newInterval(func() { fn(loop.vm) })
  149. if loop.addAuxJob(func() {
  150. i.start(loop, timeout)
  151. loop.jobCount++
  152. i.idx = len(loop.jobs)
  153. loop.jobs = append(loop.jobs, &i.job)
  154. }) {
  155. return i
  156. }
  157. return nil
  158. }
  159. // ClearInterval cancels an Interval returned by SetInterval.
  160. // ClearInterval is safe to call inside or outside the loop.
  161. func (loop *EventLoop) ClearInterval(i *Interval) {
  162. loop.addAuxJob(func() {
  163. loop.clearInterval(i)
  164. })
  165. }
  166. func (loop *EventLoop) setRunning() {
  167. loop.stopLock.Lock()
  168. defer loop.stopLock.Unlock()
  169. if loop.running {
  170. panic("Loop is already started")
  171. }
  172. loop.running = true
  173. atomic.StoreInt32(&loop.canRun, 1)
  174. loop.auxJobsLock.Lock()
  175. loop.terminated = false
  176. loop.auxJobsLock.Unlock()
  177. }
  178. // Run calls the specified function, starts the event loop and waits until there are no more delayed jobs to run
  179. // after which it stops the loop and returns.
  180. // The instance of goja.Runtime that is passed to the function and any Values derived from it must not be used
  181. // outside the function.
  182. // Do NOT use this function while the loop is already running. Use RunOnLoop() instead.
  183. // If the loop is already started it will panic.
  184. func (loop *EventLoop) Run(fn func(*goja.Runtime)) {
  185. loop.setRunning()
  186. fn(loop.vm)
  187. loop.run(false)
  188. }
  189. // Start the event loop in the background. The loop continues to run until Stop() is called.
  190. // If the loop is already started it will panic.
  191. func (loop *EventLoop) Start() {
  192. loop.setRunning()
  193. go loop.run(true)
  194. }
  195. // StartInForeground starts the event loop in the current goroutine. The loop continues to run until Stop() is called.
  196. // If the loop is already started it will panic.
  197. // Use this instead of Start if you want to recover from panics that may occur while calling native Go functions from
  198. // within setInterval and setTimeout callbacks.
  199. func (loop *EventLoop) StartInForeground() {
  200. loop.setRunning()
  201. loop.run(true)
  202. }
  203. // Stop the loop that was started with Start(). After this function returns there will be no more jobs executed
  204. // by the loop. It is possible to call Start() or Run() again after this to resume the execution.
  205. // Note, it does not cancel active timeouts (use Terminate() instead if you want this).
  206. // It is not allowed to run Start() (or Run()) and Stop() or Terminate() concurrently.
  207. // Calling Stop() on a non-running loop has no effect.
  208. // It is not allowed to call Stop() from the loop, because it is synchronous and cannot complete until the loop
  209. // is not running any jobs. Use StopNoWait() instead.
  210. // return number of jobs remaining
  211. func (loop *EventLoop) Stop() int {
  212. loop.stopLock.Lock()
  213. for loop.running {
  214. atomic.StoreInt32(&loop.canRun, 0)
  215. loop.wakeup()
  216. loop.stopCond.Wait()
  217. }
  218. loop.stopLock.Unlock()
  219. return int(loop.jobCount)
  220. }
  221. // StopNoWait tells the loop to stop and returns immediately. Can be used inside the loop. Calling it on a
  222. // non-running loop has no effect.
  223. func (loop *EventLoop) StopNoWait() {
  224. loop.stopLock.Lock()
  225. if loop.running {
  226. atomic.StoreInt32(&loop.canRun, 0)
  227. loop.wakeup()
  228. }
  229. loop.stopLock.Unlock()
  230. }
  231. // Terminate stops the loop and clears all active timeouts and intervals. After it returns there are no
  232. // active timers or goroutines associated with the loop. Any attempt to submit a task (by using RunOnLoop(),
  233. // SetTimeout() or SetInterval()) will not succeed.
  234. // After being terminated the loop can be restarted again by using Start() or Run().
  235. // This method must not be called concurrently with Stop*(), Start(), or Run().
  236. func (loop *EventLoop) Terminate() {
  237. loop.Stop()
  238. loop.auxJobsLock.Lock()
  239. loop.terminated = true
  240. loop.auxJobsLock.Unlock()
  241. loop.runAux()
  242. for i := 0; i < len(loop.jobs); i++ {
  243. job := loop.jobs[i]
  244. if !job.cancelled {
  245. job.cancelled = true
  246. if job.cancel() {
  247. loop.removeJob(job)
  248. i--
  249. }
  250. }
  251. }
  252. for len(loop.jobs) > 0 {
  253. (<-loop.jobChan)()
  254. }
  255. }
  256. // RunOnLoop schedules to run the specified function in the context of the loop as soon as possible.
  257. // The order of the runs is preserved (i.e. the functions will be called in the same order as calls to RunOnLoop())
  258. // The instance of goja.Runtime that is passed to the function and any Values derived from it must not be used
  259. // outside the function. It is safe to call inside or outside the loop.
  260. // Returns true on success or false if the loop is terminated (see Terminate()).
  261. func (loop *EventLoop) RunOnLoop(fn func(*goja.Runtime)) bool {
  262. return loop.addAuxJob(func() { fn(loop.vm) })
  263. }
  264. func (loop *EventLoop) runAux() {
  265. loop.auxJobsLock.Lock()
  266. jobs := loop.auxJobs
  267. loop.auxJobs = loop.auxJobsSpare
  268. loop.auxJobsLock.Unlock()
  269. for i, job := range jobs {
  270. job()
  271. jobs[i] = nil
  272. }
  273. loop.auxJobsSpare = jobs[:0]
  274. }
  275. func (loop *EventLoop) run(inBackground bool) {
  276. loop.runAux()
  277. if inBackground {
  278. loop.jobCount++
  279. }
  280. LOOP:
  281. for loop.jobCount > 0 {
  282. select {
  283. case job := <-loop.jobChan:
  284. job()
  285. case <-loop.wakeupChan:
  286. loop.runAux()
  287. if atomic.LoadInt32(&loop.canRun) == 0 {
  288. break LOOP
  289. }
  290. }
  291. }
  292. if inBackground {
  293. loop.jobCount--
  294. }
  295. loop.stopLock.Lock()
  296. loop.running = false
  297. loop.stopLock.Unlock()
  298. loop.stopCond.Broadcast()
  299. }
  300. func (loop *EventLoop) wakeup() {
  301. select {
  302. case loop.wakeupChan <- struct{}{}:
  303. default:
  304. }
  305. }
  306. func (loop *EventLoop) addAuxJob(fn func()) bool {
  307. loop.auxJobsLock.Lock()
  308. if loop.terminated {
  309. loop.auxJobsLock.Unlock()
  310. return false
  311. }
  312. loop.auxJobs = append(loop.auxJobs, fn)
  313. loop.auxJobsLock.Unlock()
  314. loop.wakeup()
  315. return true
  316. }
  317. func (loop *EventLoop) newTimeout(f func()) *Timer {
  318. t := &Timer{
  319. job: job{fn: f},
  320. }
  321. t.cancel = t.doCancel
  322. return t
  323. }
  324. func (t *Timer) start(loop *EventLoop, timeout time.Duration) {
  325. t.timer = time.AfterFunc(timeout, func() {
  326. loop.jobChan <- func() {
  327. loop.doTimeout(t)
  328. }
  329. })
  330. }
  331. func (loop *EventLoop) newInterval(f func()) *Interval {
  332. i := &Interval{
  333. job: job{fn: f},
  334. stopChan: make(chan struct{}),
  335. }
  336. i.cancel = i.doCancel
  337. return i
  338. }
  339. func (i *Interval) start(loop *EventLoop, timeout time.Duration) {
  340. // https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
  341. if timeout <= 0 {
  342. timeout = time.Millisecond
  343. }
  344. i.ticker = time.NewTicker(timeout)
  345. go i.run(loop)
  346. }
  347. func (loop *EventLoop) addImmediate(f func()) *Immediate {
  348. i := &Immediate{
  349. job: job{fn: f},
  350. }
  351. loop.addAuxJob(func() {
  352. loop.doImmediate(i)
  353. })
  354. return i
  355. }
  356. func (loop *EventLoop) doTimeout(t *Timer) {
  357. loop.removeJob(&t.job)
  358. if !t.cancelled {
  359. t.cancelled = true
  360. loop.jobCount--
  361. t.fn()
  362. }
  363. }
  364. func (loop *EventLoop) doInterval(i *Interval) {
  365. if !i.cancelled {
  366. i.fn()
  367. }
  368. }
  369. func (loop *EventLoop) doImmediate(i *Immediate) {
  370. if !i.cancelled {
  371. i.cancelled = true
  372. loop.jobCount--
  373. i.fn()
  374. }
  375. }
  376. func (loop *EventLoop) clearTimeout(t *Timer) {
  377. if t != nil && !t.cancelled {
  378. t.cancelled = true
  379. loop.jobCount--
  380. if t.doCancel() {
  381. loop.removeJob(&t.job)
  382. }
  383. }
  384. }
  385. func (loop *EventLoop) clearInterval(i *Interval) {
  386. if i != nil && !i.cancelled {
  387. i.cancelled = true
  388. loop.jobCount--
  389. i.doCancel()
  390. }
  391. }
  392. func (loop *EventLoop) removeJob(job *job) {
  393. idx := job.idx
  394. if idx < 0 {
  395. return
  396. }
  397. if idx < len(loop.jobs)-1 {
  398. loop.jobs[idx] = loop.jobs[len(loop.jobs)-1]
  399. loop.jobs[idx].idx = idx
  400. }
  401. loop.jobs[len(loop.jobs)-1] = nil
  402. loop.jobs = loop.jobs[:len(loop.jobs)-1]
  403. job.idx = -1
  404. }
  405. func (loop *EventLoop) clearImmediate(i *Immediate) {
  406. if i != nil && !i.cancelled {
  407. i.cancelled = true
  408. loop.jobCount--
  409. }
  410. }
  411. func (i *Interval) doCancel() bool {
  412. close(i.stopChan)
  413. return false
  414. }
  415. func (t *Timer) doCancel() bool {
  416. return t.timer.Stop()
  417. }
  418. func (i *Interval) run(loop *EventLoop) {
  419. L:
  420. for {
  421. select {
  422. case <-i.stopChan:
  423. i.ticker.Stop()
  424. break L
  425. case <-i.ticker.C:
  426. loop.jobChan <- func() {
  427. loop.doInterval(i)
  428. }
  429. }
  430. }
  431. loop.jobChan <- func() {
  432. loop.removeJob(&i.job)
  433. }
  434. }