threadpool.c 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include "uv-common.h"
  22. #if !defined(_WIN32)
  23. # include "unix/internal.h"
  24. #endif
  25. #include <stdlib.h>
  26. #define MAX_THREADPOOL_SIZE 1024
  27. static uv_once_t once = UV_ONCE_INIT;
  28. static uv_cond_t cond;
  29. static uv_mutex_t mutex;
  30. static unsigned int idle_threads;
  31. static unsigned int slow_io_work_running;
  32. static unsigned int nthreads;
  33. static uv_thread_t* threads;
  34. static uv_thread_t default_threads[4];
  35. static QUEUE exit_message;
  36. static QUEUE wq;
  37. static QUEUE run_slow_work_message;
  38. static QUEUE slow_io_pending_wq;
  39. static unsigned int slow_work_thread_threshold(void) {
  40. return (nthreads + 1) / 2;
  41. }
  42. static void uv__cancelled(struct uv__work* w) {
  43. abort();
  44. }
  45. /* To avoid deadlock with uv_cancel() it's crucial that the worker
  46. * never holds the global mutex and the loop-local mutex at the same time.
  47. */
  48. static void worker(void* arg) {
  49. struct uv__work* w;
  50. QUEUE* q;
  51. int is_slow_work;
  52. uv_sem_post((uv_sem_t*) arg);
  53. arg = NULL;
  54. uv_mutex_lock(&mutex);
  55. for (;;) {
  56. /* `mutex` should always be locked at this point. */
  57. /* Keep waiting while either no work is present or only slow I/O
  58. and we're at the threshold for that. */
  59. while (QUEUE_EMPTY(&wq) ||
  60. (QUEUE_HEAD(&wq) == &run_slow_work_message &&
  61. QUEUE_NEXT(&run_slow_work_message) == &wq &&
  62. slow_io_work_running >= slow_work_thread_threshold())) {
  63. idle_threads += 1;
  64. uv_cond_wait(&cond, &mutex);
  65. idle_threads -= 1;
  66. }
  67. q = QUEUE_HEAD(&wq);
  68. if (q == &exit_message) {
  69. uv_cond_signal(&cond);
  70. uv_mutex_unlock(&mutex);
  71. break;
  72. }
  73. QUEUE_REMOVE(q);
  74. QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
  75. is_slow_work = 0;
  76. if (q == &run_slow_work_message) {
  77. /* If we're at the slow I/O threshold, re-schedule until after all
  78. other work in the queue is done. */
  79. if (slow_io_work_running >= slow_work_thread_threshold()) {
  80. QUEUE_INSERT_TAIL(&wq, q);
  81. continue;
  82. }
  83. /* If we encountered a request to run slow I/O work but there is none
  84. to run, that means it's cancelled => Start over. */
  85. if (QUEUE_EMPTY(&slow_io_pending_wq))
  86. continue;
  87. is_slow_work = 1;
  88. slow_io_work_running++;
  89. q = QUEUE_HEAD(&slow_io_pending_wq);
  90. QUEUE_REMOVE(q);
  91. QUEUE_INIT(q);
  92. /* If there is more slow I/O work, schedule it to be run as well. */
  93. if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
  94. QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
  95. if (idle_threads > 0)
  96. uv_cond_signal(&cond);
  97. }
  98. }
  99. uv_mutex_unlock(&mutex);
  100. w = QUEUE_DATA(q, struct uv__work, wq);
  101. w->work(w);
  102. uv_mutex_lock(&w->loop->wq_mutex);
  103. w->work = NULL; /* Signal uv_cancel() that the work req is done
  104. executing. */
  105. QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
  106. uv_async_send(&w->loop->wq_async);
  107. uv_mutex_unlock(&w->loop->wq_mutex);
  108. /* Lock `mutex` since that is expected at the start of the next
  109. * iteration. */
  110. uv_mutex_lock(&mutex);
  111. if (is_slow_work) {
  112. /* `slow_io_work_running` is protected by `mutex`. */
  113. slow_io_work_running--;
  114. }
  115. }
  116. }
  117. static void post(QUEUE* q, enum uv__work_kind kind) {
  118. uv_mutex_lock(&mutex);
  119. if (kind == UV__WORK_SLOW_IO) {
  120. /* Insert into a separate queue. */
  121. QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
  122. if (!QUEUE_EMPTY(&run_slow_work_message)) {
  123. /* Running slow I/O tasks is already scheduled => Nothing to do here.
  124. The worker that runs said other task will schedule this one as well. */
  125. uv_mutex_unlock(&mutex);
  126. return;
  127. }
  128. q = &run_slow_work_message;
  129. }
  130. QUEUE_INSERT_TAIL(&wq, q);
  131. if (idle_threads > 0)
  132. uv_cond_signal(&cond);
  133. uv_mutex_unlock(&mutex);
  134. }
  135. void uv__threadpool_cleanup(void) {
  136. #ifndef _WIN32
  137. unsigned int i;
  138. if (nthreads == 0)
  139. return;
  140. post(&exit_message, UV__WORK_CPU);
  141. for (i = 0; i < nthreads; i++)
  142. if (uv_thread_join(threads + i))
  143. abort();
  144. if (threads != default_threads)
  145. uv__free(threads);
  146. uv_mutex_destroy(&mutex);
  147. uv_cond_destroy(&cond);
  148. threads = NULL;
  149. nthreads = 0;
  150. #endif
  151. }
  152. static void init_threads(void) {
  153. unsigned int i;
  154. const char* val;
  155. uv_sem_t sem;
  156. nthreads = ARRAY_SIZE(default_threads);
  157. val = getenv("UV_THREADPOOL_SIZE");
  158. if (val != NULL)
  159. nthreads = atoi(val);
  160. if (nthreads == 0)
  161. nthreads = 1;
  162. if (nthreads > MAX_THREADPOOL_SIZE)
  163. nthreads = MAX_THREADPOOL_SIZE;
  164. threads = default_threads;
  165. if (nthreads > ARRAY_SIZE(default_threads)) {
  166. threads = uv__malloc(nthreads * sizeof(threads[0]));
  167. if (threads == NULL) {
  168. nthreads = ARRAY_SIZE(default_threads);
  169. threads = default_threads;
  170. }
  171. }
  172. if (uv_cond_init(&cond))
  173. abort();
  174. if (uv_mutex_init(&mutex))
  175. abort();
  176. QUEUE_INIT(&wq);
  177. QUEUE_INIT(&slow_io_pending_wq);
  178. QUEUE_INIT(&run_slow_work_message);
  179. if (uv_sem_init(&sem, 0))
  180. abort();
  181. for (i = 0; i < nthreads; i++)
  182. if (uv_thread_create(threads + i, worker, &sem))
  183. abort();
  184. for (i = 0; i < nthreads; i++)
  185. uv_sem_wait(&sem);
  186. uv_sem_destroy(&sem);
  187. }
  188. #ifndef _WIN32
  189. static void reset_once(void) {
  190. uv_once_t child_once = UV_ONCE_INIT;
  191. memcpy(&once, &child_once, sizeof(child_once));
  192. }
  193. #endif
  194. static void init_once(void) {
  195. #ifndef _WIN32
  196. /* Re-initialize the threadpool after fork.
  197. * Note that this discards the global mutex and condition as well
  198. * as the work queue.
  199. */
  200. if (pthread_atfork(NULL, NULL, &reset_once))
  201. abort();
  202. #endif
  203. init_threads();
  204. }
  205. void uv__work_submit(uv_loop_t* loop,
  206. struct uv__work* w,
  207. enum uv__work_kind kind,
  208. void (*work)(struct uv__work* w),
  209. void (*done)(struct uv__work* w, int status)) {
  210. uv_once(&once, init_once);
  211. w->loop = loop;
  212. w->work = work;
  213. w->done = done;
  214. post(&w->wq, kind);
  215. }
  216. static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
  217. int cancelled;
  218. uv_mutex_lock(&mutex);
  219. uv_mutex_lock(&w->loop->wq_mutex);
  220. cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
  221. if (cancelled)
  222. QUEUE_REMOVE(&w->wq);
  223. uv_mutex_unlock(&w->loop->wq_mutex);
  224. uv_mutex_unlock(&mutex);
  225. if (!cancelled)
  226. return UV_EBUSY;
  227. w->work = uv__cancelled;
  228. uv_mutex_lock(&loop->wq_mutex);
  229. QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
  230. uv_async_send(&loop->wq_async);
  231. uv_mutex_unlock(&loop->wq_mutex);
  232. return 0;
  233. }
  234. void uv__work_done(uv_async_t* handle) {
  235. struct uv__work* w;
  236. uv_loop_t* loop;
  237. QUEUE* q;
  238. QUEUE wq;
  239. int err;
  240. loop = container_of(handle, uv_loop_t, wq_async);
  241. uv_mutex_lock(&loop->wq_mutex);
  242. QUEUE_MOVE(&loop->wq, &wq);
  243. uv_mutex_unlock(&loop->wq_mutex);
  244. while (!QUEUE_EMPTY(&wq)) {
  245. q = QUEUE_HEAD(&wq);
  246. QUEUE_REMOVE(q);
  247. w = container_of(q, struct uv__work, wq);
  248. err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
  249. w->done(w, err);
  250. }
  251. }
  252. static void uv__queue_work(struct uv__work* w) {
  253. uv_work_t* req = container_of(w, uv_work_t, work_req);
  254. req->work_cb(req);
  255. }
  256. static void uv__queue_done(struct uv__work* w, int err) {
  257. uv_work_t* req;
  258. req = container_of(w, uv_work_t, work_req);
  259. uv__req_unregister(req->loop, req);
  260. if (req->after_work_cb == NULL)
  261. return;
  262. req->after_work_cb(req, err);
  263. }
  264. int uv_queue_work(uv_loop_t* loop,
  265. uv_work_t* req,
  266. uv_work_cb work_cb,
  267. uv_after_work_cb after_work_cb) {
  268. if (work_cb == NULL)
  269. return UV_EINVAL;
  270. uv__req_init(loop, req, UV_WORK);
  271. req->loop = loop;
  272. req->work_cb = work_cb;
  273. req->after_work_cb = after_work_cb;
  274. uv__work_submit(loop,
  275. &req->work_req,
  276. UV__WORK_CPU,
  277. uv__queue_work,
  278. uv__queue_done);
  279. return 0;
  280. }
  281. int uv_cancel(uv_req_t* req) {
  282. struct uv__work* wreq;
  283. uv_loop_t* loop;
  284. switch (req->type) {
  285. case UV_FS:
  286. loop = ((uv_fs_t*) req)->loop;
  287. wreq = &((uv_fs_t*) req)->work_req;
  288. break;
  289. case UV_GETADDRINFO:
  290. loop = ((uv_getaddrinfo_t*) req)->loop;
  291. wreq = &((uv_getaddrinfo_t*) req)->work_req;
  292. break;
  293. case UV_GETNAMEINFO:
  294. loop = ((uv_getnameinfo_t*) req)->loop;
  295. wreq = &((uv_getnameinfo_t*) req)->work_req;
  296. break;
  297. case UV_RANDOM:
  298. loop = ((uv_random_t*) req)->loop;
  299. wreq = &((uv_random_t*) req)->work_req;
  300. break;
  301. case UV_WORK:
  302. loop = ((uv_work_t*) req)->loop;
  303. wreq = &((uv_work_t*) req)->work_req;
  304. break;
  305. default:
  306. return UV_EINVAL;
  307. }
  308. return uv__work_cancel(loop, req, wreq);
  309. }