stream.c 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629
  1. /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include "uv.h"
  22. #include "internal.h"
  23. #include <stdio.h>
  24. #include <stdlib.h>
  25. #include <string.h>
  26. #include <assert.h>
  27. #include <errno.h>
  28. #include <sys/types.h>
  29. #include <sys/socket.h>
  30. #include <sys/uio.h>
  31. #include <sys/un.h>
  32. #include <unistd.h>
  33. #include <limits.h> /* IOV_MAX */
  34. #if defined(__APPLE__)
  35. # include <sys/event.h>
  36. # include <sys/time.h>
  37. # include <sys/select.h>
  38. /* Forward declaration */
  39. typedef struct uv__stream_select_s uv__stream_select_t;
  40. struct uv__stream_select_s {
  41. uv_stream_t* stream;
  42. uv_thread_t thread;
  43. uv_sem_t close_sem;
  44. uv_sem_t async_sem;
  45. uv_async_t async;
  46. int events;
  47. int fake_fd;
  48. int int_fd;
  49. int fd;
  50. fd_set* sread;
  51. size_t sread_sz;
  52. fd_set* swrite;
  53. size_t swrite_sz;
  54. };
  55. #endif /* defined(__APPLE__) */
  56. static void uv__stream_connect(uv_stream_t*);
  57. static void uv__write(uv_stream_t* stream);
  58. static void uv__read(uv_stream_t* stream);
  59. static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
  60. static void uv__write_callbacks(uv_stream_t* stream);
  61. static size_t uv__write_req_size(uv_write_t* req);
  62. static void uv__drain(uv_stream_t* stream);
  63. void uv__stream_init(uv_loop_t* loop,
  64. uv_stream_t* stream,
  65. uv_handle_type type) {
  66. int err;
  67. uv__handle_init(loop, (uv_handle_t*)stream, type);
  68. stream->read_cb = NULL;
  69. stream->alloc_cb = NULL;
  70. stream->close_cb = NULL;
  71. stream->connection_cb = NULL;
  72. stream->connect_req = NULL;
  73. stream->shutdown_req = NULL;
  74. stream->accepted_fd = -1;
  75. stream->queued_fds = NULL;
  76. stream->delayed_error = 0;
  77. QUEUE_INIT(&stream->write_queue);
  78. QUEUE_INIT(&stream->write_completed_queue);
  79. stream->write_queue_size = 0;
  80. if (loop->emfile_fd == -1) {
  81. err = uv__open_cloexec("/dev/null", O_RDONLY);
  82. if (err < 0)
  83. /* In the rare case that "/dev/null" isn't mounted open "/"
  84. * instead.
  85. */
  86. err = uv__open_cloexec("/", O_RDONLY);
  87. if (err >= 0)
  88. loop->emfile_fd = err;
  89. }
  90. #if defined(__APPLE__)
  91. stream->select = NULL;
  92. #endif /* defined(__APPLE_) */
  93. uv__io_init(&stream->io_watcher, uv__stream_io, -1);
  94. }
  95. static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
  96. #if defined(__APPLE__)
  97. /* Notify select() thread about state change */
  98. uv__stream_select_t* s;
  99. int r;
  100. s = stream->select;
  101. if (s == NULL)
  102. return;
  103. /* Interrupt select() loop
  104. * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
  105. * emit read event on other side
  106. */
  107. do
  108. r = write(s->fake_fd, "x", 1);
  109. while (r == -1 && errno == EINTR);
  110. assert(r == 1);
  111. #else /* !defined(__APPLE__) */
  112. /* No-op on any other platform */
  113. #endif /* !defined(__APPLE__) */
  114. }
  115. #if defined(__APPLE__)
  116. static void uv__stream_osx_select(void* arg) {
  117. uv_stream_t* stream;
  118. uv__stream_select_t* s;
  119. char buf[1024];
  120. int events;
  121. int fd;
  122. int r;
  123. int max_fd;
  124. stream = arg;
  125. s = stream->select;
  126. fd = s->fd;
  127. if (fd > s->int_fd)
  128. max_fd = fd;
  129. else
  130. max_fd = s->int_fd;
  131. for (;;) {
  132. /* Terminate on semaphore */
  133. if (uv_sem_trywait(&s->close_sem) == 0)
  134. break;
  135. /* Watch fd using select(2) */
  136. memset(s->sread, 0, s->sread_sz);
  137. memset(s->swrite, 0, s->swrite_sz);
  138. if (uv__io_active(&stream->io_watcher, POLLIN))
  139. FD_SET(fd, s->sread);
  140. if (uv__io_active(&stream->io_watcher, POLLOUT))
  141. FD_SET(fd, s->swrite);
  142. FD_SET(s->int_fd, s->sread);
  143. /* Wait indefinitely for fd events */
  144. r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
  145. if (r == -1) {
  146. if (errno == EINTR)
  147. continue;
  148. /* XXX: Possible?! */
  149. abort();
  150. }
  151. /* Ignore timeouts */
  152. if (r == 0)
  153. continue;
  154. /* Empty socketpair's buffer in case of interruption */
  155. if (FD_ISSET(s->int_fd, s->sread))
  156. for (;;) {
  157. r = read(s->int_fd, buf, sizeof(buf));
  158. if (r == sizeof(buf))
  159. continue;
  160. if (r != -1)
  161. break;
  162. if (errno == EAGAIN || errno == EWOULDBLOCK)
  163. break;
  164. if (errno == EINTR)
  165. continue;
  166. abort();
  167. }
  168. /* Handle events */
  169. events = 0;
  170. if (FD_ISSET(fd, s->sread))
  171. events |= POLLIN;
  172. if (FD_ISSET(fd, s->swrite))
  173. events |= POLLOUT;
  174. assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
  175. if (events != 0) {
  176. ACCESS_ONCE(int, s->events) = events;
  177. uv_async_send(&s->async);
  178. uv_sem_wait(&s->async_sem);
  179. /* Should be processed at this stage */
  180. assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
  181. }
  182. }
  183. }
  184. static void uv__stream_osx_select_cb(uv_async_t* handle) {
  185. uv__stream_select_t* s;
  186. uv_stream_t* stream;
  187. int events;
  188. s = container_of(handle, uv__stream_select_t, async);
  189. stream = s->stream;
  190. /* Get and reset stream's events */
  191. events = s->events;
  192. ACCESS_ONCE(int, s->events) = 0;
  193. assert(events != 0);
  194. assert(events == (events & (POLLIN | POLLOUT)));
  195. /* Invoke callback on event-loop */
  196. if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
  197. uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
  198. if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
  199. uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
  200. if (stream->flags & UV_HANDLE_CLOSING)
  201. return;
  202. /* NOTE: It is important to do it here, otherwise `select()` might be called
  203. * before the actual `uv__read()`, leading to the blocking syscall
  204. */
  205. uv_sem_post(&s->async_sem);
  206. }
  207. static void uv__stream_osx_cb_close(uv_handle_t* async) {
  208. uv__stream_select_t* s;
  209. s = container_of(async, uv__stream_select_t, async);
  210. uv__free(s);
  211. }
  212. int uv__stream_try_select(uv_stream_t* stream, int* fd) {
  213. /*
  214. * kqueue doesn't work with some files from /dev mount on osx.
  215. * select(2) in separate thread for those fds
  216. */
  217. struct kevent filter[1];
  218. struct kevent events[1];
  219. struct timespec timeout;
  220. uv__stream_select_t* s;
  221. int fds[2];
  222. int err;
  223. int ret;
  224. int kq;
  225. int old_fd;
  226. int max_fd;
  227. size_t sread_sz;
  228. size_t swrite_sz;
  229. kq = kqueue();
  230. if (kq == -1) {
  231. perror("(libuv) kqueue()");
  232. return UV__ERR(errno);
  233. }
  234. EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
  235. /* Use small timeout, because we only want to capture EINVALs */
  236. timeout.tv_sec = 0;
  237. timeout.tv_nsec = 1;
  238. do
  239. ret = kevent(kq, filter, 1, events, 1, &timeout);
  240. while (ret == -1 && errno == EINTR);
  241. uv__close(kq);
  242. if (ret == -1)
  243. return UV__ERR(errno);
  244. if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
  245. return 0;
  246. /* At this point we definitely know that this fd won't work with kqueue */
  247. /*
  248. * Create fds for io watcher and to interrupt the select() loop.
  249. * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
  250. */
  251. if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
  252. return UV__ERR(errno);
  253. max_fd = *fd;
  254. if (fds[1] > max_fd)
  255. max_fd = fds[1];
  256. sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
  257. swrite_sz = sread_sz;
  258. s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
  259. if (s == NULL) {
  260. err = UV_ENOMEM;
  261. goto failed_malloc;
  262. }
  263. s->events = 0;
  264. s->fd = *fd;
  265. s->sread = (fd_set*) ((char*) s + sizeof(*s));
  266. s->sread_sz = sread_sz;
  267. s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
  268. s->swrite_sz = swrite_sz;
  269. err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
  270. if (err)
  271. goto failed_async_init;
  272. s->async.flags |= UV_HANDLE_INTERNAL;
  273. uv__handle_unref(&s->async);
  274. err = uv_sem_init(&s->close_sem, 0);
  275. if (err != 0)
  276. goto failed_close_sem_init;
  277. err = uv_sem_init(&s->async_sem, 0);
  278. if (err != 0)
  279. goto failed_async_sem_init;
  280. s->fake_fd = fds[0];
  281. s->int_fd = fds[1];
  282. old_fd = *fd;
  283. s->stream = stream;
  284. stream->select = s;
  285. *fd = s->fake_fd;
  286. err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
  287. if (err != 0)
  288. goto failed_thread_create;
  289. return 0;
  290. failed_thread_create:
  291. s->stream = NULL;
  292. stream->select = NULL;
  293. *fd = old_fd;
  294. uv_sem_destroy(&s->async_sem);
  295. failed_async_sem_init:
  296. uv_sem_destroy(&s->close_sem);
  297. failed_close_sem_init:
  298. uv__close(fds[0]);
  299. uv__close(fds[1]);
  300. uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
  301. return err;
  302. failed_async_init:
  303. uv__free(s);
  304. failed_malloc:
  305. uv__close(fds[0]);
  306. uv__close(fds[1]);
  307. return err;
  308. }
  309. #endif /* defined(__APPLE__) */
  310. int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
  311. #if defined(__APPLE__)
  312. int enable;
  313. #endif
  314. if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
  315. return UV_EBUSY;
  316. assert(fd >= 0);
  317. stream->flags |= flags;
  318. if (stream->type == UV_TCP) {
  319. if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
  320. return UV__ERR(errno);
  321. /* TODO Use delay the user passed in. */
  322. if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
  323. uv__tcp_keepalive(fd, 1, 60)) {
  324. return UV__ERR(errno);
  325. }
  326. }
  327. #if defined(__APPLE__)
  328. enable = 1;
  329. if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
  330. errno != ENOTSOCK &&
  331. errno != EINVAL) {
  332. return UV__ERR(errno);
  333. }
  334. #endif
  335. stream->io_watcher.fd = fd;
  336. return 0;
  337. }
  338. void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
  339. uv_write_t* req;
  340. QUEUE* q;
  341. while (!QUEUE_EMPTY(&stream->write_queue)) {
  342. q = QUEUE_HEAD(&stream->write_queue);
  343. QUEUE_REMOVE(q);
  344. req = QUEUE_DATA(q, uv_write_t, queue);
  345. req->error = error;
  346. QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
  347. }
  348. }
  349. void uv__stream_destroy(uv_stream_t* stream) {
  350. assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
  351. assert(stream->flags & UV_HANDLE_CLOSED);
  352. if (stream->connect_req) {
  353. uv__req_unregister(stream->loop, stream->connect_req);
  354. stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
  355. stream->connect_req = NULL;
  356. }
  357. uv__stream_flush_write_queue(stream, UV_ECANCELED);
  358. uv__write_callbacks(stream);
  359. uv__drain(stream);
  360. assert(stream->write_queue_size == 0);
  361. }
  362. /* Implements a best effort approach to mitigating accept() EMFILE errors.
  363. * We have a spare file descriptor stashed away that we close to get below
  364. * the EMFILE limit. Next, we accept all pending connections and close them
  365. * immediately to signal the clients that we're overloaded - and we are, but
  366. * we still keep on trucking.
  367. *
  368. * There is one caveat: it's not reliable in a multi-threaded environment.
  369. * The file descriptor limit is per process. Our party trick fails if another
  370. * thread opens a file or creates a socket in the time window between us
  371. * calling close() and accept().
  372. */
  373. static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
  374. int err;
  375. int emfile_fd;
  376. if (loop->emfile_fd == -1)
  377. return UV_EMFILE;
  378. uv__close(loop->emfile_fd);
  379. loop->emfile_fd = -1;
  380. do {
  381. err = uv__accept(accept_fd);
  382. if (err >= 0)
  383. uv__close(err);
  384. } while (err >= 0 || err == UV_EINTR);
  385. emfile_fd = uv__open_cloexec("/", O_RDONLY);
  386. if (emfile_fd >= 0)
  387. loop->emfile_fd = emfile_fd;
  388. return err;
  389. }
  390. #if defined(UV_HAVE_KQUEUE)
  391. # define UV_DEC_BACKLOG(w) w->rcount--;
  392. #else
  393. # define UV_DEC_BACKLOG(w) /* no-op */
  394. #endif /* defined(UV_HAVE_KQUEUE) */
  395. void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  396. uv_stream_t* stream;
  397. int err;
  398. stream = container_of(w, uv_stream_t, io_watcher);
  399. assert(events & POLLIN);
  400. assert(stream->accepted_fd == -1);
  401. assert(!(stream->flags & UV_HANDLE_CLOSING));
  402. uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
  403. /* connection_cb can close the server socket while we're
  404. * in the loop so check it on each iteration.
  405. */
  406. while (uv__stream_fd(stream) != -1) {
  407. assert(stream->accepted_fd == -1);
  408. #if defined(UV_HAVE_KQUEUE)
  409. if (w->rcount <= 0)
  410. return;
  411. #endif /* defined(UV_HAVE_KQUEUE) */
  412. err = uv__accept(uv__stream_fd(stream));
  413. if (err < 0) {
  414. if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
  415. return; /* Not an error. */
  416. if (err == UV_ECONNABORTED)
  417. continue; /* Ignore. Nothing we can do about that. */
  418. if (err == UV_EMFILE || err == UV_ENFILE) {
  419. err = uv__emfile_trick(loop, uv__stream_fd(stream));
  420. if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
  421. break;
  422. }
  423. stream->connection_cb(stream, err);
  424. continue;
  425. }
  426. UV_DEC_BACKLOG(w)
  427. stream->accepted_fd = err;
  428. stream->connection_cb(stream, 0);
  429. if (stream->accepted_fd != -1) {
  430. /* The user hasn't yet accepted called uv_accept() */
  431. uv__io_stop(loop, &stream->io_watcher, POLLIN);
  432. return;
  433. }
  434. if (stream->type == UV_TCP &&
  435. (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
  436. /* Give other processes a chance to accept connections. */
  437. struct timespec timeout = { 0, 1 };
  438. nanosleep(&timeout, NULL);
  439. }
  440. }
  441. }
  442. #undef UV_DEC_BACKLOG
  443. int uv_accept(uv_stream_t* server, uv_stream_t* client) {
  444. int err;
  445. assert(server->loop == client->loop);
  446. if (server->accepted_fd == -1)
  447. return UV_EAGAIN;
  448. switch (client->type) {
  449. case UV_NAMED_PIPE:
  450. case UV_TCP:
  451. err = uv__stream_open(client,
  452. server->accepted_fd,
  453. UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
  454. if (err) {
  455. /* TODO handle error */
  456. uv__close(server->accepted_fd);
  457. goto done;
  458. }
  459. break;
  460. case UV_UDP:
  461. err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
  462. if (err) {
  463. uv__close(server->accepted_fd);
  464. goto done;
  465. }
  466. break;
  467. default:
  468. return UV_EINVAL;
  469. }
  470. client->flags |= UV_HANDLE_BOUND;
  471. done:
  472. /* Process queued fds */
  473. if (server->queued_fds != NULL) {
  474. uv__stream_queued_fds_t* queued_fds;
  475. queued_fds = server->queued_fds;
  476. /* Read first */
  477. server->accepted_fd = queued_fds->fds[0];
  478. /* All read, free */
  479. assert(queued_fds->offset > 0);
  480. if (--queued_fds->offset == 0) {
  481. uv__free(queued_fds);
  482. server->queued_fds = NULL;
  483. } else {
  484. /* Shift rest */
  485. memmove(queued_fds->fds,
  486. queued_fds->fds + 1,
  487. queued_fds->offset * sizeof(*queued_fds->fds));
  488. }
  489. } else {
  490. server->accepted_fd = -1;
  491. if (err == 0)
  492. uv__io_start(server->loop, &server->io_watcher, POLLIN);
  493. }
  494. return err;
  495. }
  496. int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
  497. int err;
  498. if (uv__is_closing(stream)) {
  499. return UV_EINVAL;
  500. }
  501. switch (stream->type) {
  502. case UV_TCP:
  503. err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
  504. break;
  505. case UV_NAMED_PIPE:
  506. err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
  507. break;
  508. default:
  509. err = UV_EINVAL;
  510. }
  511. if (err == 0)
  512. uv__handle_start(stream);
  513. return err;
  514. }
  515. static void uv__drain(uv_stream_t* stream) {
  516. uv_shutdown_t* req;
  517. int err;
  518. assert(QUEUE_EMPTY(&stream->write_queue));
  519. if (!(stream->flags & UV_HANDLE_CLOSING)) {
  520. uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  521. uv__stream_osx_interrupt_select(stream);
  522. }
  523. if (!(stream->flags & UV_HANDLE_SHUTTING))
  524. return;
  525. req = stream->shutdown_req;
  526. assert(req);
  527. if ((stream->flags & UV_HANDLE_CLOSING) ||
  528. !(stream->flags & UV_HANDLE_SHUT)) {
  529. stream->shutdown_req = NULL;
  530. stream->flags &= ~UV_HANDLE_SHUTTING;
  531. uv__req_unregister(stream->loop, req);
  532. err = 0;
  533. if (stream->flags & UV_HANDLE_CLOSING)
  534. /* The user destroyed the stream before we got to do the shutdown. */
  535. err = UV_ECANCELED;
  536. else if (shutdown(uv__stream_fd(stream), SHUT_WR))
  537. err = UV__ERR(errno);
  538. else /* Success. */
  539. stream->flags |= UV_HANDLE_SHUT;
  540. if (req->cb != NULL)
  541. req->cb(req, err);
  542. }
  543. }
  544. static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
  545. if (n == 1)
  546. return write(fd, vec->iov_base, vec->iov_len);
  547. else
  548. return writev(fd, vec, n);
  549. }
  550. static size_t uv__write_req_size(uv_write_t* req) {
  551. size_t size;
  552. assert(req->bufs != NULL);
  553. size = uv__count_bufs(req->bufs + req->write_index,
  554. req->nbufs - req->write_index);
  555. assert(req->handle->write_queue_size >= size);
  556. return size;
  557. }
  558. /* Returns 1 if all write request data has been written, or 0 if there is still
  559. * more data to write.
  560. *
  561. * Note: the return value only says something about the *current* request.
  562. * There may still be other write requests sitting in the queue.
  563. */
  564. static int uv__write_req_update(uv_stream_t* stream,
  565. uv_write_t* req,
  566. size_t n) {
  567. uv_buf_t* buf;
  568. size_t len;
  569. assert(n <= stream->write_queue_size);
  570. stream->write_queue_size -= n;
  571. buf = req->bufs + req->write_index;
  572. do {
  573. len = n < buf->len ? n : buf->len;
  574. buf->base += len;
  575. buf->len -= len;
  576. buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */
  577. n -= len;
  578. } while (n > 0);
  579. req->write_index = buf - req->bufs;
  580. return req->write_index == req->nbufs;
  581. }
  582. static void uv__write_req_finish(uv_write_t* req) {
  583. uv_stream_t* stream = req->handle;
  584. /* Pop the req off tcp->write_queue. */
  585. QUEUE_REMOVE(&req->queue);
  586. /* Only free when there was no error. On error, we touch up write_queue_size
  587. * right before making the callback. The reason we don't do that right away
  588. * is that a write_queue_size > 0 is our only way to signal to the user that
  589. * they should stop writing - which they should if we got an error. Something
  590. * to revisit in future revisions of the libuv API.
  591. */
  592. if (req->error == 0) {
  593. if (req->bufs != req->bufsml)
  594. uv__free(req->bufs);
  595. req->bufs = NULL;
  596. }
  597. /* Add it to the write_completed_queue where it will have its
  598. * callback called in the near future.
  599. */
  600. QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
  601. uv__io_feed(stream->loop, &stream->io_watcher);
  602. }
  603. static int uv__handle_fd(uv_handle_t* handle) {
  604. switch (handle->type) {
  605. case UV_NAMED_PIPE:
  606. case UV_TCP:
  607. return ((uv_stream_t*) handle)->io_watcher.fd;
  608. case UV_UDP:
  609. return ((uv_udp_t*) handle)->io_watcher.fd;
  610. default:
  611. return -1;
  612. }
  613. }
  614. static int uv__try_write(uv_stream_t* stream,
  615. const uv_buf_t bufs[],
  616. unsigned int nbufs,
  617. uv_stream_t* send_handle) {
  618. struct iovec* iov;
  619. int iovmax;
  620. int iovcnt;
  621. ssize_t n;
  622. /*
  623. * Cast to iovec. We had to have our own uv_buf_t instead of iovec
  624. * because Windows's WSABUF is not an iovec.
  625. */
  626. iov = (struct iovec*) bufs;
  627. iovcnt = nbufs;
  628. iovmax = uv__getiovmax();
  629. /* Limit iov count to avoid EINVALs from writev() */
  630. if (iovcnt > iovmax)
  631. iovcnt = iovmax;
  632. /*
  633. * Now do the actual writev. Note that we've been updating the pointers
  634. * inside the iov each time we write. So there is no need to offset it.
  635. */
  636. if (send_handle != NULL) {
  637. int fd_to_send;
  638. struct msghdr msg;
  639. struct cmsghdr *cmsg;
  640. union {
  641. char data[64];
  642. struct cmsghdr alias;
  643. } scratch;
  644. if (uv__is_closing(send_handle))
  645. return UV_EBADF;
  646. fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
  647. memset(&scratch, 0, sizeof(scratch));
  648. assert(fd_to_send >= 0);
  649. msg.msg_name = NULL;
  650. msg.msg_namelen = 0;
  651. msg.msg_iov = iov;
  652. msg.msg_iovlen = iovcnt;
  653. msg.msg_flags = 0;
  654. msg.msg_control = &scratch.alias;
  655. msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
  656. cmsg = CMSG_FIRSTHDR(&msg);
  657. cmsg->cmsg_level = SOL_SOCKET;
  658. cmsg->cmsg_type = SCM_RIGHTS;
  659. cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
  660. /* silence aliasing warning */
  661. {
  662. void* pv = CMSG_DATA(cmsg);
  663. int* pi = pv;
  664. *pi = fd_to_send;
  665. }
  666. do
  667. n = sendmsg(uv__stream_fd(stream), &msg, 0);
  668. while (n == -1 && errno == EINTR);
  669. } else {
  670. do
  671. n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
  672. while (n == -1 && errno == EINTR);
  673. }
  674. if (n >= 0)
  675. return n;
  676. if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
  677. return UV_EAGAIN;
  678. #ifdef __APPLE__
  679. /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
  680. * have a bug where a race condition causes the kernel to return EPROTOTYPE
  681. * because the socket isn't fully constructed. It's probably the result of
  682. * the peer closing the connection and that is why libuv translates it to
  683. * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
  684. * away but some VPN software causes the same behavior except the error is
  685. * permanent, not transient, turning the retry mechanism into an infinite
  686. * loop. See https://github.com/libuv/libuv/pull/482.
  687. */
  688. if (errno == EPROTOTYPE)
  689. return UV_ECONNRESET;
  690. #endif /* __APPLE__ */
  691. return UV__ERR(errno);
  692. }
  693. static void uv__write(uv_stream_t* stream) {
  694. QUEUE* q;
  695. uv_write_t* req;
  696. ssize_t n;
  697. assert(uv__stream_fd(stream) >= 0);
  698. for (;;) {
  699. if (QUEUE_EMPTY(&stream->write_queue))
  700. return;
  701. q = QUEUE_HEAD(&stream->write_queue);
  702. req = QUEUE_DATA(q, uv_write_t, queue);
  703. assert(req->handle == stream);
  704. n = uv__try_write(stream,
  705. &(req->bufs[req->write_index]),
  706. req->nbufs - req->write_index,
  707. req->send_handle);
  708. /* Ensure the handle isn't sent again in case this is a partial write. */
  709. if (n >= 0) {
  710. req->send_handle = NULL;
  711. if (uv__write_req_update(stream, req, n)) {
  712. uv__write_req_finish(req);
  713. return; /* TODO(bnoordhuis) Start trying to write the next request. */
  714. }
  715. } else if (n != UV_EAGAIN)
  716. break;
  717. /* If this is a blocking stream, try again. */
  718. if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
  719. continue;
  720. /* We're not done. */
  721. uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
  722. /* Notify select() thread about state change */
  723. uv__stream_osx_interrupt_select(stream);
  724. return;
  725. }
  726. req->error = n;
  727. uv__write_req_finish(req);
  728. uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  729. uv__stream_osx_interrupt_select(stream);
  730. }
  731. static void uv__write_callbacks(uv_stream_t* stream) {
  732. uv_write_t* req;
  733. QUEUE* q;
  734. QUEUE pq;
  735. if (QUEUE_EMPTY(&stream->write_completed_queue))
  736. return;
  737. QUEUE_MOVE(&stream->write_completed_queue, &pq);
  738. while (!QUEUE_EMPTY(&pq)) {
  739. /* Pop a req off write_completed_queue. */
  740. q = QUEUE_HEAD(&pq);
  741. req = QUEUE_DATA(q, uv_write_t, queue);
  742. QUEUE_REMOVE(q);
  743. uv__req_unregister(stream->loop, req);
  744. if (req->bufs != NULL) {
  745. stream->write_queue_size -= uv__write_req_size(req);
  746. if (req->bufs != req->bufsml)
  747. uv__free(req->bufs);
  748. req->bufs = NULL;
  749. }
  750. /* NOTE: call callback AFTER freeing the request data. */
  751. if (req->cb)
  752. req->cb(req, req->error);
  753. }
  754. }
  755. static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
  756. stream->flags |= UV_HANDLE_READ_EOF;
  757. stream->flags &= ~UV_HANDLE_READING;
  758. uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
  759. uv__handle_stop(stream);
  760. uv__stream_osx_interrupt_select(stream);
  761. stream->read_cb(stream, UV_EOF, buf);
  762. }
  763. static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
  764. uv__stream_queued_fds_t* queued_fds;
  765. unsigned int queue_size;
  766. queued_fds = stream->queued_fds;
  767. if (queued_fds == NULL) {
  768. queue_size = 8;
  769. queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
  770. sizeof(*queued_fds));
  771. if (queued_fds == NULL)
  772. return UV_ENOMEM;
  773. queued_fds->size = queue_size;
  774. queued_fds->offset = 0;
  775. stream->queued_fds = queued_fds;
  776. /* Grow */
  777. } else if (queued_fds->size == queued_fds->offset) {
  778. queue_size = queued_fds->size + 8;
  779. queued_fds = uv__realloc(queued_fds,
  780. (queue_size - 1) * sizeof(*queued_fds->fds) +
  781. sizeof(*queued_fds));
  782. /*
  783. * Allocation failure, report back.
  784. * NOTE: if it is fatal - sockets will be closed in uv__stream_close
  785. */
  786. if (queued_fds == NULL)
  787. return UV_ENOMEM;
  788. queued_fds->size = queue_size;
  789. stream->queued_fds = queued_fds;
  790. }
  791. /* Put fd in a queue */
  792. queued_fds->fds[queued_fds->offset++] = fd;
  793. return 0;
  794. }
  795. #if defined(__PASE__)
  796. /* on IBMi PASE the control message length can not exceed 256. */
  797. # define UV__CMSG_FD_COUNT 60
  798. #else
  799. # define UV__CMSG_FD_COUNT 64
  800. #endif
  801. #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
  802. static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
  803. struct cmsghdr* cmsg;
  804. for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
  805. char* start;
  806. char* end;
  807. int err;
  808. void* pv;
  809. int* pi;
  810. unsigned int i;
  811. unsigned int count;
  812. if (cmsg->cmsg_type != SCM_RIGHTS) {
  813. fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
  814. cmsg->cmsg_type);
  815. continue;
  816. }
  817. /* silence aliasing warning */
  818. pv = CMSG_DATA(cmsg);
  819. pi = pv;
  820. /* Count available fds */
  821. start = (char*) cmsg;
  822. end = (char*) cmsg + cmsg->cmsg_len;
  823. count = 0;
  824. while (start + CMSG_LEN(count * sizeof(*pi)) < end)
  825. count++;
  826. assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
  827. for (i = 0; i < count; i++) {
  828. /* Already has accepted fd, queue now */
  829. if (stream->accepted_fd != -1) {
  830. err = uv__stream_queue_fd(stream, pi[i]);
  831. if (err != 0) {
  832. /* Close rest */
  833. for (; i < count; i++)
  834. uv__close(pi[i]);
  835. return err;
  836. }
  837. } else {
  838. stream->accepted_fd = pi[i];
  839. }
  840. }
  841. }
  842. return 0;
  843. }
  844. #ifdef __clang__
  845. # pragma clang diagnostic push
  846. # pragma clang diagnostic ignored "-Wgnu-folding-constant"
  847. # pragma clang diagnostic ignored "-Wvla-extension"
  848. #endif
  849. static void uv__read(uv_stream_t* stream) {
  850. uv_buf_t buf;
  851. ssize_t nread;
  852. struct msghdr msg;
  853. char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
  854. int count;
  855. int err;
  856. int is_ipc;
  857. stream->flags &= ~UV_HANDLE_READ_PARTIAL;
  858. /* Prevent loop starvation when the data comes in as fast as (or faster than)
  859. * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
  860. */
  861. count = 32;
  862. is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
  863. /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
  864. * tcp->read_cb is NULL or not?
  865. */
  866. while (stream->read_cb
  867. && (stream->flags & UV_HANDLE_READING)
  868. && (count-- > 0)) {
  869. assert(stream->alloc_cb != NULL);
  870. buf = uv_buf_init(NULL, 0);
  871. stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
  872. if (buf.base == NULL || buf.len == 0) {
  873. /* User indicates it can't or won't handle the read. */
  874. stream->read_cb(stream, UV_ENOBUFS, &buf);
  875. return;
  876. }
  877. assert(buf.base != NULL);
  878. assert(uv__stream_fd(stream) >= 0);
  879. if (!is_ipc) {
  880. do {
  881. nread = read(uv__stream_fd(stream), buf.base, buf.len);
  882. }
  883. while (nread < 0 && errno == EINTR);
  884. } else {
  885. /* ipc uses recvmsg */
  886. msg.msg_flags = 0;
  887. msg.msg_iov = (struct iovec*) &buf;
  888. msg.msg_iovlen = 1;
  889. msg.msg_name = NULL;
  890. msg.msg_namelen = 0;
  891. /* Set up to receive a descriptor even if one isn't in the message */
  892. msg.msg_controllen = sizeof(cmsg_space);
  893. msg.msg_control = cmsg_space;
  894. do {
  895. nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
  896. }
  897. while (nread < 0 && errno == EINTR);
  898. }
  899. if (nread < 0) {
  900. /* Error */
  901. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  902. /* Wait for the next one. */
  903. if (stream->flags & UV_HANDLE_READING) {
  904. uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
  905. uv__stream_osx_interrupt_select(stream);
  906. }
  907. stream->read_cb(stream, 0, &buf);
  908. #if defined(__CYGWIN__) || defined(__MSYS__)
  909. } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
  910. uv__stream_eof(stream, &buf);
  911. return;
  912. #endif
  913. } else {
  914. /* Error. User should call uv_close(). */
  915. stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
  916. stream->read_cb(stream, UV__ERR(errno), &buf);
  917. if (stream->flags & UV_HANDLE_READING) {
  918. stream->flags &= ~UV_HANDLE_READING;
  919. uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
  920. uv__handle_stop(stream);
  921. uv__stream_osx_interrupt_select(stream);
  922. }
  923. }
  924. return;
  925. } else if (nread == 0) {
  926. uv__stream_eof(stream, &buf);
  927. return;
  928. } else {
  929. /* Successful read */
  930. ssize_t buflen = buf.len;
  931. if (is_ipc) {
  932. err = uv__stream_recv_cmsg(stream, &msg);
  933. if (err != 0) {
  934. stream->read_cb(stream, err, &buf);
  935. return;
  936. }
  937. }
  938. #if defined(__MVS__)
  939. if (is_ipc && msg.msg_controllen > 0) {
  940. uv_buf_t blankbuf;
  941. int nread;
  942. struct iovec *old;
  943. blankbuf.base = 0;
  944. blankbuf.len = 0;
  945. old = msg.msg_iov;
  946. msg.msg_iov = (struct iovec*) &blankbuf;
  947. nread = 0;
  948. do {
  949. nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
  950. err = uv__stream_recv_cmsg(stream, &msg);
  951. if (err != 0) {
  952. stream->read_cb(stream, err, &buf);
  953. msg.msg_iov = old;
  954. return;
  955. }
  956. } while (nread == 0 && msg.msg_controllen > 0);
  957. msg.msg_iov = old;
  958. }
  959. #endif
  960. stream->read_cb(stream, nread, &buf);
  961. /* Return if we didn't fill the buffer, there is no more data to read. */
  962. if (nread < buflen) {
  963. stream->flags |= UV_HANDLE_READ_PARTIAL;
  964. return;
  965. }
  966. }
  967. }
  968. }
  969. #ifdef __clang__
  970. # pragma clang diagnostic pop
  971. #endif
  972. #undef UV__CMSG_FD_COUNT
  973. #undef UV__CMSG_FD_SIZE
  974. int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
  975. assert(stream->type == UV_TCP ||
  976. stream->type == UV_TTY ||
  977. stream->type == UV_NAMED_PIPE);
  978. if (!(stream->flags & UV_HANDLE_WRITABLE) ||
  979. stream->flags & UV_HANDLE_SHUT ||
  980. stream->flags & UV_HANDLE_SHUTTING ||
  981. uv__is_closing(stream)) {
  982. return UV_ENOTCONN;
  983. }
  984. assert(uv__stream_fd(stream) >= 0);
  985. /* Initialize request. The `shutdown(2)` call will always be deferred until
  986. * `uv__drain`, just before the callback is run. */
  987. uv__req_init(stream->loop, req, UV_SHUTDOWN);
  988. req->handle = stream;
  989. req->cb = cb;
  990. stream->shutdown_req = req;
  991. stream->flags |= UV_HANDLE_SHUTTING;
  992. stream->flags &= ~UV_HANDLE_WRITABLE;
  993. if (QUEUE_EMPTY(&stream->write_queue))
  994. uv__io_feed(stream->loop, &stream->io_watcher);
  995. return 0;
  996. }
  997. static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  998. uv_stream_t* stream;
  999. stream = container_of(w, uv_stream_t, io_watcher);
  1000. assert(stream->type == UV_TCP ||
  1001. stream->type == UV_NAMED_PIPE ||
  1002. stream->type == UV_TTY);
  1003. assert(!(stream->flags & UV_HANDLE_CLOSING));
  1004. if (stream->connect_req) {
  1005. uv__stream_connect(stream);
  1006. return;
  1007. }
  1008. assert(uv__stream_fd(stream) >= 0);
  1009. /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
  1010. if (events & (POLLIN | POLLERR | POLLHUP))
  1011. uv__read(stream);
  1012. if (uv__stream_fd(stream) == -1)
  1013. return; /* read_cb closed stream. */
  1014. /* Short-circuit iff POLLHUP is set, the user is still interested in read
  1015. * events and uv__read() reported a partial read but not EOF. If the EOF
  1016. * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
  1017. * have to do anything. If the partial read flag is not set, we can't
  1018. * report the EOF yet because there is still data to read.
  1019. */
  1020. if ((events & POLLHUP) &&
  1021. (stream->flags & UV_HANDLE_READING) &&
  1022. (stream->flags & UV_HANDLE_READ_PARTIAL) &&
  1023. !(stream->flags & UV_HANDLE_READ_EOF)) {
  1024. uv_buf_t buf = { NULL, 0 };
  1025. uv__stream_eof(stream, &buf);
  1026. }
  1027. if (uv__stream_fd(stream) == -1)
  1028. return; /* read_cb closed stream. */
  1029. if (events & (POLLOUT | POLLERR | POLLHUP)) {
  1030. uv__write(stream);
  1031. uv__write_callbacks(stream);
  1032. /* Write queue drained. */
  1033. if (QUEUE_EMPTY(&stream->write_queue))
  1034. uv__drain(stream);
  1035. }
  1036. }
  1037. /**
  1038. * We get called here from directly following a call to connect(2).
  1039. * In order to determine if we've errored out or succeeded must call
  1040. * getsockopt.
  1041. */
  1042. static void uv__stream_connect(uv_stream_t* stream) {
  1043. int error;
  1044. uv_connect_t* req = stream->connect_req;
  1045. socklen_t errorsize = sizeof(int);
  1046. assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
  1047. assert(req);
  1048. if (stream->delayed_error) {
  1049. /* To smooth over the differences between unixes errors that
  1050. * were reported synchronously on the first connect can be delayed
  1051. * until the next tick--which is now.
  1052. */
  1053. error = stream->delayed_error;
  1054. stream->delayed_error = 0;
  1055. } else {
  1056. /* Normal situation: we need to get the socket error from the kernel. */
  1057. assert(uv__stream_fd(stream) >= 0);
  1058. getsockopt(uv__stream_fd(stream),
  1059. SOL_SOCKET,
  1060. SO_ERROR,
  1061. &error,
  1062. &errorsize);
  1063. error = UV__ERR(error);
  1064. }
  1065. if (error == UV__ERR(EINPROGRESS))
  1066. return;
  1067. stream->connect_req = NULL;
  1068. uv__req_unregister(stream->loop, req);
  1069. if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
  1070. uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  1071. }
  1072. if (req->cb)
  1073. req->cb(req, error);
  1074. if (uv__stream_fd(stream) == -1)
  1075. return;
  1076. if (error < 0) {
  1077. uv__stream_flush_write_queue(stream, UV_ECANCELED);
  1078. uv__write_callbacks(stream);
  1079. }
  1080. }
  1081. static int uv__check_before_write(uv_stream_t* stream,
  1082. unsigned int nbufs,
  1083. uv_stream_t* send_handle) {
  1084. assert(nbufs > 0);
  1085. assert((stream->type == UV_TCP ||
  1086. stream->type == UV_NAMED_PIPE ||
  1087. stream->type == UV_TTY) &&
  1088. "uv_write (unix) does not yet support other types of streams");
  1089. if (uv__stream_fd(stream) < 0)
  1090. return UV_EBADF;
  1091. if (!(stream->flags & UV_HANDLE_WRITABLE))
  1092. return UV_EPIPE;
  1093. if (send_handle != NULL) {
  1094. if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
  1095. return UV_EINVAL;
  1096. /* XXX We abuse uv_write2() to send over UDP handles to child processes.
  1097. * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
  1098. * evaluates to a function that operates on a uv_stream_t with a couple of
  1099. * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
  1100. * which works but only by accident.
  1101. */
  1102. if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
  1103. return UV_EBADF;
  1104. #if defined(__CYGWIN__) || defined(__MSYS__)
  1105. /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
  1106. See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
  1107. return UV_ENOSYS;
  1108. #endif
  1109. }
  1110. return 0;
  1111. }
  1112. int uv_write2(uv_write_t* req,
  1113. uv_stream_t* stream,
  1114. const uv_buf_t bufs[],
  1115. unsigned int nbufs,
  1116. uv_stream_t* send_handle,
  1117. uv_write_cb cb) {
  1118. int empty_queue;
  1119. int err;
  1120. err = uv__check_before_write(stream, nbufs, send_handle);
  1121. if (err < 0)
  1122. return err;
  1123. /* It's legal for write_queue_size > 0 even when the write_queue is empty;
  1124. * it means there are error-state requests in the write_completed_queue that
  1125. * will touch up write_queue_size later, see also uv__write_req_finish().
  1126. * We could check that write_queue is empty instead but that implies making
  1127. * a write() syscall when we know that the handle is in error mode.
  1128. */
  1129. empty_queue = (stream->write_queue_size == 0);
  1130. /* Initialize the req */
  1131. uv__req_init(stream->loop, req, UV_WRITE);
  1132. req->cb = cb;
  1133. req->handle = stream;
  1134. req->error = 0;
  1135. req->send_handle = send_handle;
  1136. QUEUE_INIT(&req->queue);
  1137. req->bufs = req->bufsml;
  1138. if (nbufs > ARRAY_SIZE(req->bufsml))
  1139. req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
  1140. if (req->bufs == NULL)
  1141. return UV_ENOMEM;
  1142. memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
  1143. req->nbufs = nbufs;
  1144. req->write_index = 0;
  1145. stream->write_queue_size += uv__count_bufs(bufs, nbufs);
  1146. /* Append the request to write_queue. */
  1147. QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
  1148. /* If the queue was empty when this function began, we should attempt to
  1149. * do the write immediately. Otherwise start the write_watcher and wait
  1150. * for the fd to become writable.
  1151. */
  1152. if (stream->connect_req) {
  1153. /* Still connecting, do nothing. */
  1154. }
  1155. else if (empty_queue) {
  1156. uv__write(stream);
  1157. }
  1158. else {
  1159. /*
  1160. * blocking streams should never have anything in the queue.
  1161. * if this assert fires then somehow the blocking stream isn't being
  1162. * sufficiently flushed in uv__write.
  1163. */
  1164. assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
  1165. uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
  1166. uv__stream_osx_interrupt_select(stream);
  1167. }
  1168. return 0;
  1169. }
  1170. /* The buffers to be written must remain valid until the callback is called.
  1171. * This is not required for the uv_buf_t array.
  1172. */
  1173. int uv_write(uv_write_t* req,
  1174. uv_stream_t* handle,
  1175. const uv_buf_t bufs[],
  1176. unsigned int nbufs,
  1177. uv_write_cb cb) {
  1178. return uv_write2(req, handle, bufs, nbufs, NULL, cb);
  1179. }
  1180. int uv_try_write(uv_stream_t* stream,
  1181. const uv_buf_t bufs[],
  1182. unsigned int nbufs) {
  1183. return uv_try_write2(stream, bufs, nbufs, NULL);
  1184. }
  1185. int uv_try_write2(uv_stream_t* stream,
  1186. const uv_buf_t bufs[],
  1187. unsigned int nbufs,
  1188. uv_stream_t* send_handle) {
  1189. int err;
  1190. /* Connecting or already writing some data */
  1191. if (stream->connect_req != NULL || stream->write_queue_size != 0)
  1192. return UV_EAGAIN;
  1193. err = uv__check_before_write(stream, nbufs, NULL);
  1194. if (err < 0)
  1195. return err;
  1196. return uv__try_write(stream, bufs, nbufs, send_handle);
  1197. }
  1198. int uv__read_start(uv_stream_t* stream,
  1199. uv_alloc_cb alloc_cb,
  1200. uv_read_cb read_cb) {
  1201. assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
  1202. stream->type == UV_TTY);
  1203. /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
  1204. * just expresses the desired state of the user. */
  1205. stream->flags |= UV_HANDLE_READING;
  1206. stream->flags &= ~UV_HANDLE_READ_EOF;
  1207. /* TODO: try to do the read inline? */
  1208. assert(uv__stream_fd(stream) >= 0);
  1209. assert(alloc_cb);
  1210. stream->read_cb = read_cb;
  1211. stream->alloc_cb = alloc_cb;
  1212. uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
  1213. uv__handle_start(stream);
  1214. uv__stream_osx_interrupt_select(stream);
  1215. return 0;
  1216. }
  1217. int uv_read_stop(uv_stream_t* stream) {
  1218. if (!(stream->flags & UV_HANDLE_READING))
  1219. return 0;
  1220. stream->flags &= ~UV_HANDLE_READING;
  1221. uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
  1222. uv__handle_stop(stream);
  1223. uv__stream_osx_interrupt_select(stream);
  1224. stream->read_cb = NULL;
  1225. stream->alloc_cb = NULL;
  1226. return 0;
  1227. }
  1228. int uv_is_readable(const uv_stream_t* stream) {
  1229. return !!(stream->flags & UV_HANDLE_READABLE);
  1230. }
  1231. int uv_is_writable(const uv_stream_t* stream) {
  1232. return !!(stream->flags & UV_HANDLE_WRITABLE);
  1233. }
  1234. #if defined(__APPLE__)
  1235. int uv___stream_fd(const uv_stream_t* handle) {
  1236. const uv__stream_select_t* s;
  1237. assert(handle->type == UV_TCP ||
  1238. handle->type == UV_TTY ||
  1239. handle->type == UV_NAMED_PIPE);
  1240. s = handle->select;
  1241. if (s != NULL)
  1242. return s->fd;
  1243. return handle->io_watcher.fd;
  1244. }
  1245. #endif /* defined(__APPLE__) */
  1246. void uv__stream_close(uv_stream_t* handle) {
  1247. unsigned int i;
  1248. uv__stream_queued_fds_t* queued_fds;
  1249. #if defined(__APPLE__)
  1250. /* Terminate select loop first */
  1251. if (handle->select != NULL) {
  1252. uv__stream_select_t* s;
  1253. s = handle->select;
  1254. uv_sem_post(&s->close_sem);
  1255. uv_sem_post(&s->async_sem);
  1256. uv__stream_osx_interrupt_select(handle);
  1257. uv_thread_join(&s->thread);
  1258. uv_sem_destroy(&s->close_sem);
  1259. uv_sem_destroy(&s->async_sem);
  1260. uv__close(s->fake_fd);
  1261. uv__close(s->int_fd);
  1262. uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
  1263. handle->select = NULL;
  1264. }
  1265. #endif /* defined(__APPLE__) */
  1266. uv__io_close(handle->loop, &handle->io_watcher);
  1267. uv_read_stop(handle);
  1268. uv__handle_stop(handle);
  1269. handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
  1270. if (handle->io_watcher.fd != -1) {
  1271. /* Don't close stdio file descriptors. Nothing good comes from it. */
  1272. if (handle->io_watcher.fd > STDERR_FILENO)
  1273. uv__close(handle->io_watcher.fd);
  1274. handle->io_watcher.fd = -1;
  1275. }
  1276. if (handle->accepted_fd != -1) {
  1277. uv__close(handle->accepted_fd);
  1278. handle->accepted_fd = -1;
  1279. }
  1280. /* Close all queued fds */
  1281. if (handle->queued_fds != NULL) {
  1282. queued_fds = handle->queued_fds;
  1283. for (i = 0; i < queued_fds->offset; i++)
  1284. uv__close(queued_fds->fds[i]);
  1285. uv__free(handle->queued_fds);
  1286. handle->queued_fds = NULL;
  1287. }
  1288. assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
  1289. }
  1290. int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
  1291. /* Don't need to check the file descriptor, uv__nonblock()
  1292. * will fail with EBADF if it's not valid.
  1293. */
  1294. return uv__nonblock(uv__stream_fd(handle), !blocking);
  1295. }