1
0

stream.c 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688
  1. /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include "uv.h"
  22. #include "internal.h"
  23. #include <stdio.h>
  24. #include <stdlib.h>
  25. #include <string.h>
  26. #include <assert.h>
  27. #include <errno.h>
  28. #include <sys/types.h>
  29. #include <sys/socket.h>
  30. #include <sys/uio.h>
  31. #include <sys/un.h>
  32. #include <unistd.h>
  33. #include <limits.h> /* IOV_MAX */
  34. #if defined(__APPLE__)
  35. # include <sys/event.h>
  36. # include <sys/time.h>
  37. # include <sys/select.h>
  38. /* Forward declaration */
  39. typedef struct uv__stream_select_s uv__stream_select_t;
  40. struct uv__stream_select_s {
  41. uv_stream_t* stream;
  42. uv_thread_t thread;
  43. uv_sem_t close_sem;
  44. uv_sem_t async_sem;
  45. uv_async_t async;
  46. int events;
  47. int fake_fd;
  48. int int_fd;
  49. int fd;
  50. fd_set* sread;
  51. size_t sread_sz;
  52. fd_set* swrite;
  53. size_t swrite_sz;
  54. };
  55. /* Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
  56. * EPROTOTYPE can be returned while trying to write to a socket that is
  57. * shutting down. If we retry the write, we should get the expected EPIPE
  58. * instead.
  59. */
  60. # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
  61. # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
  62. (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \
  63. (errno == EMSGSIZE && send_handle != NULL))
  64. #else
  65. # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
  66. # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
  67. (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
  68. #endif /* defined(__APPLE__) */
  69. static void uv__stream_connect(uv_stream_t*);
  70. static void uv__write(uv_stream_t* stream);
  71. static void uv__read(uv_stream_t* stream);
  72. static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
  73. static void uv__write_callbacks(uv_stream_t* stream);
  74. static size_t uv__write_req_size(uv_write_t* req);
  75. void uv__stream_init(uv_loop_t* loop,
  76. uv_stream_t* stream,
  77. uv_handle_type type) {
  78. int err;
  79. uv__handle_init(loop, (uv_handle_t*)stream, type);
  80. stream->read_cb = NULL;
  81. stream->alloc_cb = NULL;
  82. stream->close_cb = NULL;
  83. stream->connection_cb = NULL;
  84. stream->connect_req = NULL;
  85. stream->shutdown_req = NULL;
  86. stream->accepted_fd = -1;
  87. stream->queued_fds = NULL;
  88. stream->delayed_error = 0;
  89. QUEUE_INIT(&stream->write_queue);
  90. QUEUE_INIT(&stream->write_completed_queue);
  91. stream->write_queue_size = 0;
  92. if (loop->emfile_fd == -1) {
  93. err = uv__open_cloexec("/dev/null", O_RDONLY);
  94. if (err < 0)
  95. /* In the rare case that "/dev/null" isn't mounted open "/"
  96. * instead.
  97. */
  98. err = uv__open_cloexec("/", O_RDONLY);
  99. if (err >= 0)
  100. loop->emfile_fd = err;
  101. }
  102. #if defined(__APPLE__)
  103. stream->select = NULL;
  104. #endif /* defined(__APPLE_) */
  105. uv__io_init(&stream->io_watcher, uv__stream_io, -1);
  106. }
  107. static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
  108. #if defined(__APPLE__)
  109. /* Notify select() thread about state change */
  110. uv__stream_select_t* s;
  111. int r;
  112. s = stream->select;
  113. if (s == NULL)
  114. return;
  115. /* Interrupt select() loop
  116. * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
  117. * emit read event on other side
  118. */
  119. do
  120. r = write(s->fake_fd, "x", 1);
  121. while (r == -1 && errno == EINTR);
  122. assert(r == 1);
  123. #else /* !defined(__APPLE__) */
  124. /* No-op on any other platform */
  125. #endif /* !defined(__APPLE__) */
  126. }
  127. #if defined(__APPLE__)
  128. static void uv__stream_osx_select(void* arg) {
  129. uv_stream_t* stream;
  130. uv__stream_select_t* s;
  131. char buf[1024];
  132. int events;
  133. int fd;
  134. int r;
  135. int max_fd;
  136. stream = arg;
  137. s = stream->select;
  138. fd = s->fd;
  139. if (fd > s->int_fd)
  140. max_fd = fd;
  141. else
  142. max_fd = s->int_fd;
  143. while (1) {
  144. /* Terminate on semaphore */
  145. if (uv_sem_trywait(&s->close_sem) == 0)
  146. break;
  147. /* Watch fd using select(2) */
  148. memset(s->sread, 0, s->sread_sz);
  149. memset(s->swrite, 0, s->swrite_sz);
  150. if (uv__io_active(&stream->io_watcher, POLLIN))
  151. FD_SET(fd, s->sread);
  152. if (uv__io_active(&stream->io_watcher, POLLOUT))
  153. FD_SET(fd, s->swrite);
  154. FD_SET(s->int_fd, s->sread);
  155. /* Wait indefinitely for fd events */
  156. r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
  157. if (r == -1) {
  158. if (errno == EINTR)
  159. continue;
  160. /* XXX: Possible?! */
  161. abort();
  162. }
  163. /* Ignore timeouts */
  164. if (r == 0)
  165. continue;
  166. /* Empty socketpair's buffer in case of interruption */
  167. if (FD_ISSET(s->int_fd, s->sread))
  168. while (1) {
  169. r = read(s->int_fd, buf, sizeof(buf));
  170. if (r == sizeof(buf))
  171. continue;
  172. if (r != -1)
  173. break;
  174. if (errno == EAGAIN || errno == EWOULDBLOCK)
  175. break;
  176. if (errno == EINTR)
  177. continue;
  178. abort();
  179. }
  180. /* Handle events */
  181. events = 0;
  182. if (FD_ISSET(fd, s->sread))
  183. events |= POLLIN;
  184. if (FD_ISSET(fd, s->swrite))
  185. events |= POLLOUT;
  186. assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
  187. if (events != 0) {
  188. ACCESS_ONCE(int, s->events) = events;
  189. uv_async_send(&s->async);
  190. uv_sem_wait(&s->async_sem);
  191. /* Should be processed at this stage */
  192. assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
  193. }
  194. }
  195. }
  196. static void uv__stream_osx_select_cb(uv_async_t* handle) {
  197. uv__stream_select_t* s;
  198. uv_stream_t* stream;
  199. int events;
  200. s = container_of(handle, uv__stream_select_t, async);
  201. stream = s->stream;
  202. /* Get and reset stream's events */
  203. events = s->events;
  204. ACCESS_ONCE(int, s->events) = 0;
  205. assert(events != 0);
  206. assert(events == (events & (POLLIN | POLLOUT)));
  207. /* Invoke callback on event-loop */
  208. if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
  209. uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
  210. if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
  211. uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
  212. if (stream->flags & UV_HANDLE_CLOSING)
  213. return;
  214. /* NOTE: It is important to do it here, otherwise `select()` might be called
  215. * before the actual `uv__read()`, leading to the blocking syscall
  216. */
  217. uv_sem_post(&s->async_sem);
  218. }
  219. static void uv__stream_osx_cb_close(uv_handle_t* async) {
  220. uv__stream_select_t* s;
  221. s = container_of(async, uv__stream_select_t, async);
  222. uv__free(s);
  223. }
  224. int uv__stream_try_select(uv_stream_t* stream, int* fd) {
  225. /*
  226. * kqueue doesn't work with some files from /dev mount on osx.
  227. * select(2) in separate thread for those fds
  228. */
  229. struct kevent filter[1];
  230. struct kevent events[1];
  231. struct timespec timeout;
  232. uv__stream_select_t* s;
  233. int fds[2];
  234. int err;
  235. int ret;
  236. int kq;
  237. int old_fd;
  238. int max_fd;
  239. size_t sread_sz;
  240. size_t swrite_sz;
  241. kq = kqueue();
  242. if (kq == -1) {
  243. perror("(libuv) kqueue()");
  244. return UV__ERR(errno);
  245. }
  246. EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
  247. /* Use small timeout, because we only want to capture EINVALs */
  248. timeout.tv_sec = 0;
  249. timeout.tv_nsec = 1;
  250. do
  251. ret = kevent(kq, filter, 1, events, 1, &timeout);
  252. while (ret == -1 && errno == EINTR);
  253. uv__close(kq);
  254. if (ret == -1)
  255. return UV__ERR(errno);
  256. if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
  257. return 0;
  258. /* At this point we definitely know that this fd won't work with kqueue */
  259. /*
  260. * Create fds for io watcher and to interrupt the select() loop.
  261. * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
  262. */
  263. if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
  264. return UV__ERR(errno);
  265. max_fd = *fd;
  266. if (fds[1] > max_fd)
  267. max_fd = fds[1];
  268. sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
  269. swrite_sz = sread_sz;
  270. s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
  271. if (s == NULL) {
  272. err = UV_ENOMEM;
  273. goto failed_malloc;
  274. }
  275. s->events = 0;
  276. s->fd = *fd;
  277. s->sread = (fd_set*) ((char*) s + sizeof(*s));
  278. s->sread_sz = sread_sz;
  279. s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
  280. s->swrite_sz = swrite_sz;
  281. err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
  282. if (err)
  283. goto failed_async_init;
  284. s->async.flags |= UV_HANDLE_INTERNAL;
  285. uv__handle_unref(&s->async);
  286. err = uv_sem_init(&s->close_sem, 0);
  287. if (err != 0)
  288. goto failed_close_sem_init;
  289. err = uv_sem_init(&s->async_sem, 0);
  290. if (err != 0)
  291. goto failed_async_sem_init;
  292. s->fake_fd = fds[0];
  293. s->int_fd = fds[1];
  294. old_fd = *fd;
  295. s->stream = stream;
  296. stream->select = s;
  297. *fd = s->fake_fd;
  298. err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
  299. if (err != 0)
  300. goto failed_thread_create;
  301. return 0;
  302. failed_thread_create:
  303. s->stream = NULL;
  304. stream->select = NULL;
  305. *fd = old_fd;
  306. uv_sem_destroy(&s->async_sem);
  307. failed_async_sem_init:
  308. uv_sem_destroy(&s->close_sem);
  309. failed_close_sem_init:
  310. uv__close(fds[0]);
  311. uv__close(fds[1]);
  312. uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
  313. return err;
  314. failed_async_init:
  315. uv__free(s);
  316. failed_malloc:
  317. uv__close(fds[0]);
  318. uv__close(fds[1]);
  319. return err;
  320. }
  321. #endif /* defined(__APPLE__) */
  322. int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
  323. #if defined(__APPLE__)
  324. int enable;
  325. #endif
  326. if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
  327. return UV_EBUSY;
  328. assert(fd >= 0);
  329. stream->flags |= flags;
  330. if (stream->type == UV_TCP) {
  331. if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
  332. return UV__ERR(errno);
  333. /* TODO Use delay the user passed in. */
  334. if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
  335. uv__tcp_keepalive(fd, 1, 60)) {
  336. return UV__ERR(errno);
  337. }
  338. }
  339. #if defined(__APPLE__)
  340. enable = 1;
  341. if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
  342. errno != ENOTSOCK &&
  343. errno != EINVAL) {
  344. return UV__ERR(errno);
  345. }
  346. #endif
  347. stream->io_watcher.fd = fd;
  348. return 0;
  349. }
  350. void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
  351. uv_write_t* req;
  352. QUEUE* q;
  353. while (!QUEUE_EMPTY(&stream->write_queue)) {
  354. q = QUEUE_HEAD(&stream->write_queue);
  355. QUEUE_REMOVE(q);
  356. req = QUEUE_DATA(q, uv_write_t, queue);
  357. req->error = error;
  358. QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
  359. }
  360. }
  361. void uv__stream_destroy(uv_stream_t* stream) {
  362. assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
  363. assert(stream->flags & UV_HANDLE_CLOSED);
  364. if (stream->connect_req) {
  365. uv__req_unregister(stream->loop, stream->connect_req);
  366. stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
  367. stream->connect_req = NULL;
  368. }
  369. uv__stream_flush_write_queue(stream, UV_ECANCELED);
  370. uv__write_callbacks(stream);
  371. if (stream->shutdown_req) {
  372. /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
  373. * fait accompli at this point. Maybe we should revisit this in v0.11.
  374. * A possible reason for leaving it unchanged is that it informs the
  375. * callee that the handle has been destroyed.
  376. */
  377. uv__req_unregister(stream->loop, stream->shutdown_req);
  378. stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED);
  379. stream->shutdown_req = NULL;
  380. }
  381. assert(stream->write_queue_size == 0);
  382. }
  383. /* Implements a best effort approach to mitigating accept() EMFILE errors.
  384. * We have a spare file descriptor stashed away that we close to get below
  385. * the EMFILE limit. Next, we accept all pending connections and close them
  386. * immediately to signal the clients that we're overloaded - and we are, but
  387. * we still keep on trucking.
  388. *
  389. * There is one caveat: it's not reliable in a multi-threaded environment.
  390. * The file descriptor limit is per process. Our party trick fails if another
  391. * thread opens a file or creates a socket in the time window between us
  392. * calling close() and accept().
  393. */
  394. static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
  395. int err;
  396. int emfile_fd;
  397. if (loop->emfile_fd == -1)
  398. return UV_EMFILE;
  399. uv__close(loop->emfile_fd);
  400. loop->emfile_fd = -1;
  401. do {
  402. err = uv__accept(accept_fd);
  403. if (err >= 0)
  404. uv__close(err);
  405. } while (err >= 0 || err == UV_EINTR);
  406. emfile_fd = uv__open_cloexec("/", O_RDONLY);
  407. if (emfile_fd >= 0)
  408. loop->emfile_fd = emfile_fd;
  409. return err;
  410. }
  411. #if defined(UV_HAVE_KQUEUE)
  412. # define UV_DEC_BACKLOG(w) w->rcount--;
  413. #else
  414. # define UV_DEC_BACKLOG(w) /* no-op */
  415. #endif /* defined(UV_HAVE_KQUEUE) */
  416. void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  417. uv_stream_t* stream;
  418. int err;
  419. stream = container_of(w, uv_stream_t, io_watcher);
  420. assert(events & POLLIN);
  421. assert(stream->accepted_fd == -1);
  422. assert(!(stream->flags & UV_HANDLE_CLOSING));
  423. uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
  424. /* connection_cb can close the server socket while we're
  425. * in the loop so check it on each iteration.
  426. */
  427. while (uv__stream_fd(stream) != -1) {
  428. assert(stream->accepted_fd == -1);
  429. #if defined(UV_HAVE_KQUEUE)
  430. if (w->rcount <= 0)
  431. return;
  432. #endif /* defined(UV_HAVE_KQUEUE) */
  433. err = uv__accept(uv__stream_fd(stream));
  434. if (err < 0) {
  435. if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
  436. return; /* Not an error. */
  437. if (err == UV_ECONNABORTED)
  438. continue; /* Ignore. Nothing we can do about that. */
  439. if (err == UV_EMFILE || err == UV_ENFILE) {
  440. err = uv__emfile_trick(loop, uv__stream_fd(stream));
  441. if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
  442. break;
  443. }
  444. stream->connection_cb(stream, err);
  445. continue;
  446. }
  447. UV_DEC_BACKLOG(w)
  448. stream->accepted_fd = err;
  449. stream->connection_cb(stream, 0);
  450. if (stream->accepted_fd != -1) {
  451. /* The user hasn't yet accepted called uv_accept() */
  452. uv__io_stop(loop, &stream->io_watcher, POLLIN);
  453. return;
  454. }
  455. if (stream->type == UV_TCP &&
  456. (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
  457. /* Give other processes a chance to accept connections. */
  458. struct timespec timeout = { 0, 1 };
  459. nanosleep(&timeout, NULL);
  460. }
  461. }
  462. }
  463. #undef UV_DEC_BACKLOG
  464. int uv_accept(uv_stream_t* server, uv_stream_t* client) {
  465. int err;
  466. assert(server->loop == client->loop);
  467. if (server->accepted_fd == -1)
  468. return UV_EAGAIN;
  469. switch (client->type) {
  470. case UV_NAMED_PIPE:
  471. case UV_TCP:
  472. err = uv__stream_open(client,
  473. server->accepted_fd,
  474. UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
  475. if (err) {
  476. /* TODO handle error */
  477. uv__close(server->accepted_fd);
  478. goto done;
  479. }
  480. break;
  481. case UV_UDP:
  482. err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
  483. if (err) {
  484. uv__close(server->accepted_fd);
  485. goto done;
  486. }
  487. break;
  488. default:
  489. return UV_EINVAL;
  490. }
  491. client->flags |= UV_HANDLE_BOUND;
  492. done:
  493. /* Process queued fds */
  494. if (server->queued_fds != NULL) {
  495. uv__stream_queued_fds_t* queued_fds;
  496. queued_fds = server->queued_fds;
  497. /* Read first */
  498. server->accepted_fd = queued_fds->fds[0];
  499. /* All read, free */
  500. assert(queued_fds->offset > 0);
  501. if (--queued_fds->offset == 0) {
  502. uv__free(queued_fds);
  503. server->queued_fds = NULL;
  504. } else {
  505. /* Shift rest */
  506. memmove(queued_fds->fds,
  507. queued_fds->fds + 1,
  508. queued_fds->offset * sizeof(*queued_fds->fds));
  509. }
  510. } else {
  511. server->accepted_fd = -1;
  512. if (err == 0)
  513. uv__io_start(server->loop, &server->io_watcher, POLLIN);
  514. }
  515. return err;
  516. }
  517. int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
  518. int err;
  519. switch (stream->type) {
  520. case UV_TCP:
  521. err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
  522. break;
  523. case UV_NAMED_PIPE:
  524. err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
  525. break;
  526. default:
  527. err = UV_EINVAL;
  528. }
  529. if (err == 0)
  530. uv__handle_start(stream);
  531. return err;
  532. }
  533. static void uv__drain(uv_stream_t* stream) {
  534. uv_shutdown_t* req;
  535. int err;
  536. assert(QUEUE_EMPTY(&stream->write_queue));
  537. uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  538. uv__stream_osx_interrupt_select(stream);
  539. /* Shutdown? */
  540. if ((stream->flags & UV_HANDLE_SHUTTING) &&
  541. !(stream->flags & UV_HANDLE_CLOSING) &&
  542. !(stream->flags & UV_HANDLE_SHUT)) {
  543. assert(stream->shutdown_req);
  544. req = stream->shutdown_req;
  545. stream->shutdown_req = NULL;
  546. stream->flags &= ~UV_HANDLE_SHUTTING;
  547. uv__req_unregister(stream->loop, req);
  548. err = 0;
  549. if (shutdown(uv__stream_fd(stream), SHUT_WR))
  550. err = UV__ERR(errno);
  551. if (err == 0)
  552. stream->flags |= UV_HANDLE_SHUT;
  553. if (req->cb != NULL)
  554. req->cb(req, err);
  555. }
  556. }
  557. static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
  558. if (n == 1)
  559. return write(fd, vec->iov_base, vec->iov_len);
  560. else
  561. return writev(fd, vec, n);
  562. }
  563. static size_t uv__write_req_size(uv_write_t* req) {
  564. size_t size;
  565. assert(req->bufs != NULL);
  566. size = uv__count_bufs(req->bufs + req->write_index,
  567. req->nbufs - req->write_index);
  568. assert(req->handle->write_queue_size >= size);
  569. return size;
  570. }
  571. /* Returns 1 if all write request data has been written, or 0 if there is still
  572. * more data to write.
  573. *
  574. * Note: the return value only says something about the *current* request.
  575. * There may still be other write requests sitting in the queue.
  576. */
  577. static int uv__write_req_update(uv_stream_t* stream,
  578. uv_write_t* req,
  579. size_t n) {
  580. uv_buf_t* buf;
  581. size_t len;
  582. assert(n <= stream->write_queue_size);
  583. stream->write_queue_size -= n;
  584. buf = req->bufs + req->write_index;
  585. do {
  586. len = n < buf->len ? n : buf->len;
  587. buf->base += len;
  588. buf->len -= len;
  589. buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */
  590. n -= len;
  591. } while (n > 0);
  592. req->write_index = buf - req->bufs;
  593. return req->write_index == req->nbufs;
  594. }
  595. static void uv__write_req_finish(uv_write_t* req) {
  596. uv_stream_t* stream = req->handle;
  597. /* Pop the req off tcp->write_queue. */
  598. QUEUE_REMOVE(&req->queue);
  599. /* Only free when there was no error. On error, we touch up write_queue_size
  600. * right before making the callback. The reason we don't do that right away
  601. * is that a write_queue_size > 0 is our only way to signal to the user that
  602. * they should stop writing - which they should if we got an error. Something
  603. * to revisit in future revisions of the libuv API.
  604. */
  605. if (req->error == 0) {
  606. if (req->bufs != req->bufsml)
  607. uv__free(req->bufs);
  608. req->bufs = NULL;
  609. }
  610. /* Add it to the write_completed_queue where it will have its
  611. * callback called in the near future.
  612. */
  613. QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
  614. uv__io_feed(stream->loop, &stream->io_watcher);
  615. }
  616. static int uv__handle_fd(uv_handle_t* handle) {
  617. switch (handle->type) {
  618. case UV_NAMED_PIPE:
  619. case UV_TCP:
  620. return ((uv_stream_t*) handle)->io_watcher.fd;
  621. case UV_UDP:
  622. return ((uv_udp_t*) handle)->io_watcher.fd;
  623. default:
  624. return -1;
  625. }
  626. }
  627. static void uv__write(uv_stream_t* stream) {
  628. struct iovec* iov;
  629. QUEUE* q;
  630. uv_write_t* req;
  631. int iovmax;
  632. int iovcnt;
  633. ssize_t n;
  634. int err;
  635. start:
  636. assert(uv__stream_fd(stream) >= 0);
  637. if (QUEUE_EMPTY(&stream->write_queue))
  638. return;
  639. q = QUEUE_HEAD(&stream->write_queue);
  640. req = QUEUE_DATA(q, uv_write_t, queue);
  641. assert(req->handle == stream);
  642. /*
  643. * Cast to iovec. We had to have our own uv_buf_t instead of iovec
  644. * because Windows's WSABUF is not an iovec.
  645. */
  646. assert(sizeof(uv_buf_t) == sizeof(struct iovec));
  647. iov = (struct iovec*) &(req->bufs[req->write_index]);
  648. iovcnt = req->nbufs - req->write_index;
  649. iovmax = uv__getiovmax();
  650. /* Limit iov count to avoid EINVALs from writev() */
  651. if (iovcnt > iovmax)
  652. iovcnt = iovmax;
  653. /*
  654. * Now do the actual writev. Note that we've been updating the pointers
  655. * inside the iov each time we write. So there is no need to offset it.
  656. */
  657. if (req->send_handle) {
  658. int fd_to_send;
  659. struct msghdr msg;
  660. struct cmsghdr *cmsg;
  661. union {
  662. char data[64];
  663. struct cmsghdr alias;
  664. } scratch;
  665. if (uv__is_closing(req->send_handle)) {
  666. err = UV_EBADF;
  667. goto error;
  668. }
  669. fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
  670. memset(&scratch, 0, sizeof(scratch));
  671. assert(fd_to_send >= 0);
  672. msg.msg_name = NULL;
  673. msg.msg_namelen = 0;
  674. msg.msg_iov = iov;
  675. msg.msg_iovlen = iovcnt;
  676. msg.msg_flags = 0;
  677. msg.msg_control = &scratch.alias;
  678. msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
  679. cmsg = CMSG_FIRSTHDR(&msg);
  680. cmsg->cmsg_level = SOL_SOCKET;
  681. cmsg->cmsg_type = SCM_RIGHTS;
  682. cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
  683. /* silence aliasing warning */
  684. {
  685. void* pv = CMSG_DATA(cmsg);
  686. int* pi = pv;
  687. *pi = fd_to_send;
  688. }
  689. do
  690. n = sendmsg(uv__stream_fd(stream), &msg, 0);
  691. while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
  692. /* Ensure the handle isn't sent again in case this is a partial write. */
  693. if (n >= 0)
  694. req->send_handle = NULL;
  695. } else {
  696. do
  697. n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
  698. while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
  699. }
  700. if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
  701. err = UV__ERR(errno);
  702. goto error;
  703. }
  704. if (n >= 0 && uv__write_req_update(stream, req, n)) {
  705. uv__write_req_finish(req);
  706. return; /* TODO(bnoordhuis) Start trying to write the next request. */
  707. }
  708. /* If this is a blocking stream, try again. */
  709. if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
  710. goto start;
  711. /* We're not done. */
  712. uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
  713. /* Notify select() thread about state change */
  714. uv__stream_osx_interrupt_select(stream);
  715. return;
  716. error:
  717. req->error = err;
  718. uv__write_req_finish(req);
  719. uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  720. if (!uv__io_active(&stream->io_watcher, POLLIN))
  721. uv__handle_stop(stream);
  722. uv__stream_osx_interrupt_select(stream);
  723. }
  724. static void uv__write_callbacks(uv_stream_t* stream) {
  725. uv_write_t* req;
  726. QUEUE* q;
  727. QUEUE pq;
  728. if (QUEUE_EMPTY(&stream->write_completed_queue))
  729. return;
  730. QUEUE_MOVE(&stream->write_completed_queue, &pq);
  731. while (!QUEUE_EMPTY(&pq)) {
  732. /* Pop a req off write_completed_queue. */
  733. q = QUEUE_HEAD(&pq);
  734. req = QUEUE_DATA(q, uv_write_t, queue);
  735. QUEUE_REMOVE(q);
  736. uv__req_unregister(stream->loop, req);
  737. if (req->bufs != NULL) {
  738. stream->write_queue_size -= uv__write_req_size(req);
  739. if (req->bufs != req->bufsml)
  740. uv__free(req->bufs);
  741. req->bufs = NULL;
  742. }
  743. /* NOTE: call callback AFTER freeing the request data. */
  744. if (req->cb)
  745. req->cb(req, req->error);
  746. }
  747. }
  748. uv_handle_type uv__handle_type(int fd) {
  749. struct sockaddr_storage ss;
  750. socklen_t sslen;
  751. socklen_t len;
  752. int type;
  753. memset(&ss, 0, sizeof(ss));
  754. sslen = sizeof(ss);
  755. if (getsockname(fd, (struct sockaddr*)&ss, &sslen))
  756. return UV_UNKNOWN_HANDLE;
  757. len = sizeof type;
  758. if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
  759. return UV_UNKNOWN_HANDLE;
  760. if (type == SOCK_STREAM) {
  761. #if defined(_AIX) || defined(__DragonFly__)
  762. /* on AIX/DragonFly the getsockname call returns an empty sa structure
  763. * for sockets of type AF_UNIX. For all other types it will
  764. * return a properly filled in structure.
  765. */
  766. if (sslen == 0)
  767. return UV_NAMED_PIPE;
  768. #endif
  769. switch (ss.ss_family) {
  770. case AF_UNIX:
  771. return UV_NAMED_PIPE;
  772. case AF_INET:
  773. case AF_INET6:
  774. return UV_TCP;
  775. }
  776. }
  777. if (type == SOCK_DGRAM &&
  778. (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
  779. return UV_UDP;
  780. return UV_UNKNOWN_HANDLE;
  781. }
  782. static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
  783. stream->flags |= UV_HANDLE_READ_EOF;
  784. uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
  785. if (!uv__io_active(&stream->io_watcher, POLLOUT))
  786. uv__handle_stop(stream);
  787. uv__stream_osx_interrupt_select(stream);
  788. stream->read_cb(stream, UV_EOF, buf);
  789. stream->flags &= ~UV_HANDLE_READING;
  790. }
  791. static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
  792. uv__stream_queued_fds_t* queued_fds;
  793. unsigned int queue_size;
  794. queued_fds = stream->queued_fds;
  795. if (queued_fds == NULL) {
  796. queue_size = 8;
  797. queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
  798. sizeof(*queued_fds));
  799. if (queued_fds == NULL)
  800. return UV_ENOMEM;
  801. queued_fds->size = queue_size;
  802. queued_fds->offset = 0;
  803. stream->queued_fds = queued_fds;
  804. /* Grow */
  805. } else if (queued_fds->size == queued_fds->offset) {
  806. queue_size = queued_fds->size + 8;
  807. queued_fds = uv__realloc(queued_fds,
  808. (queue_size - 1) * sizeof(*queued_fds->fds) +
  809. sizeof(*queued_fds));
  810. /*
  811. * Allocation failure, report back.
  812. * NOTE: if it is fatal - sockets will be closed in uv__stream_close
  813. */
  814. if (queued_fds == NULL)
  815. return UV_ENOMEM;
  816. queued_fds->size = queue_size;
  817. stream->queued_fds = queued_fds;
  818. }
  819. /* Put fd in a queue */
  820. queued_fds->fds[queued_fds->offset++] = fd;
  821. return 0;
  822. }
  823. #define UV__CMSG_FD_COUNT 64
  824. #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
  825. static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
  826. struct cmsghdr* cmsg;
  827. for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
  828. char* start;
  829. char* end;
  830. int err;
  831. void* pv;
  832. int* pi;
  833. unsigned int i;
  834. unsigned int count;
  835. if (cmsg->cmsg_type != SCM_RIGHTS) {
  836. fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
  837. cmsg->cmsg_type);
  838. continue;
  839. }
  840. /* silence aliasing warning */
  841. pv = CMSG_DATA(cmsg);
  842. pi = pv;
  843. /* Count available fds */
  844. start = (char*) cmsg;
  845. end = (char*) cmsg + cmsg->cmsg_len;
  846. count = 0;
  847. while (start + CMSG_LEN(count * sizeof(*pi)) < end)
  848. count++;
  849. assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
  850. for (i = 0; i < count; i++) {
  851. /* Already has accepted fd, queue now */
  852. if (stream->accepted_fd != -1) {
  853. err = uv__stream_queue_fd(stream, pi[i]);
  854. if (err != 0) {
  855. /* Close rest */
  856. for (; i < count; i++)
  857. uv__close(pi[i]);
  858. return err;
  859. }
  860. } else {
  861. stream->accepted_fd = pi[i];
  862. }
  863. }
  864. }
  865. return 0;
  866. }
  867. #ifdef __clang__
  868. # pragma clang diagnostic push
  869. # pragma clang diagnostic ignored "-Wgnu-folding-constant"
  870. # pragma clang diagnostic ignored "-Wvla-extension"
  871. #endif
  872. static void uv__read(uv_stream_t* stream) {
  873. uv_buf_t buf;
  874. ssize_t nread;
  875. struct msghdr msg;
  876. char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
  877. int count;
  878. int err;
  879. int is_ipc;
  880. stream->flags &= ~UV_HANDLE_READ_PARTIAL;
  881. /* Prevent loop starvation when the data comes in as fast as (or faster than)
  882. * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
  883. */
  884. count = 32;
  885. is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
  886. /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
  887. * tcp->read_cb is NULL or not?
  888. */
  889. while (stream->read_cb
  890. && (stream->flags & UV_HANDLE_READING)
  891. && (count-- > 0)) {
  892. assert(stream->alloc_cb != NULL);
  893. buf = uv_buf_init(NULL, 0);
  894. stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
  895. if (buf.base == NULL || buf.len == 0) {
  896. /* User indicates it can't or won't handle the read. */
  897. stream->read_cb(stream, UV_ENOBUFS, &buf);
  898. return;
  899. }
  900. assert(buf.base != NULL);
  901. assert(uv__stream_fd(stream) >= 0);
  902. if (!is_ipc) {
  903. do {
  904. nread = read(uv__stream_fd(stream), buf.base, buf.len);
  905. }
  906. while (nread < 0 && errno == EINTR);
  907. } else {
  908. /* ipc uses recvmsg */
  909. msg.msg_flags = 0;
  910. msg.msg_iov = (struct iovec*) &buf;
  911. msg.msg_iovlen = 1;
  912. msg.msg_name = NULL;
  913. msg.msg_namelen = 0;
  914. /* Set up to receive a descriptor even if one isn't in the message */
  915. msg.msg_controllen = sizeof(cmsg_space);
  916. msg.msg_control = cmsg_space;
  917. do {
  918. nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
  919. }
  920. while (nread < 0 && errno == EINTR);
  921. }
  922. if (nread < 0) {
  923. /* Error */
  924. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  925. /* Wait for the next one. */
  926. if (stream->flags & UV_HANDLE_READING) {
  927. uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
  928. uv__stream_osx_interrupt_select(stream);
  929. }
  930. stream->read_cb(stream, 0, &buf);
  931. #if defined(__CYGWIN__) || defined(__MSYS__)
  932. } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
  933. uv__stream_eof(stream, &buf);
  934. return;
  935. #endif
  936. } else {
  937. /* Error. User should call uv_close(). */
  938. stream->read_cb(stream, UV__ERR(errno), &buf);
  939. if (stream->flags & UV_HANDLE_READING) {
  940. stream->flags &= ~UV_HANDLE_READING;
  941. uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
  942. if (!uv__io_active(&stream->io_watcher, POLLOUT))
  943. uv__handle_stop(stream);
  944. uv__stream_osx_interrupt_select(stream);
  945. }
  946. }
  947. return;
  948. } else if (nread == 0) {
  949. uv__stream_eof(stream, &buf);
  950. return;
  951. } else {
  952. /* Successful read */
  953. ssize_t buflen = buf.len;
  954. if (is_ipc) {
  955. err = uv__stream_recv_cmsg(stream, &msg);
  956. if (err != 0) {
  957. stream->read_cb(stream, err, &buf);
  958. return;
  959. }
  960. }
  961. #if defined(__MVS__)
  962. if (is_ipc && msg.msg_controllen > 0) {
  963. uv_buf_t blankbuf;
  964. int nread;
  965. struct iovec *old;
  966. blankbuf.base = 0;
  967. blankbuf.len = 0;
  968. old = msg.msg_iov;
  969. msg.msg_iov = (struct iovec*) &blankbuf;
  970. nread = 0;
  971. do {
  972. nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
  973. err = uv__stream_recv_cmsg(stream, &msg);
  974. if (err != 0) {
  975. stream->read_cb(stream, err, &buf);
  976. msg.msg_iov = old;
  977. return;
  978. }
  979. } while (nread == 0 && msg.msg_controllen > 0);
  980. msg.msg_iov = old;
  981. }
  982. #endif
  983. stream->read_cb(stream, nread, &buf);
  984. /* Return if we didn't fill the buffer, there is no more data to read. */
  985. if (nread < buflen) {
  986. stream->flags |= UV_HANDLE_READ_PARTIAL;
  987. return;
  988. }
  989. }
  990. }
  991. }
  992. #ifdef __clang__
  993. # pragma clang diagnostic pop
  994. #endif
  995. #undef UV__CMSG_FD_COUNT
  996. #undef UV__CMSG_FD_SIZE
  997. int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
  998. assert(stream->type == UV_TCP ||
  999. stream->type == UV_TTY ||
  1000. stream->type == UV_NAMED_PIPE);
  1001. if (!(stream->flags & UV_HANDLE_WRITABLE) ||
  1002. stream->flags & UV_HANDLE_SHUT ||
  1003. stream->flags & UV_HANDLE_SHUTTING ||
  1004. uv__is_closing(stream)) {
  1005. return UV_ENOTCONN;
  1006. }
  1007. assert(uv__stream_fd(stream) >= 0);
  1008. /* Initialize request */
  1009. uv__req_init(stream->loop, req, UV_SHUTDOWN);
  1010. req->handle = stream;
  1011. req->cb = cb;
  1012. stream->shutdown_req = req;
  1013. stream->flags |= UV_HANDLE_SHUTTING;
  1014. uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
  1015. uv__stream_osx_interrupt_select(stream);
  1016. return 0;
  1017. }
  1018. static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  1019. uv_stream_t* stream;
  1020. stream = container_of(w, uv_stream_t, io_watcher);
  1021. assert(stream->type == UV_TCP ||
  1022. stream->type == UV_NAMED_PIPE ||
  1023. stream->type == UV_TTY);
  1024. assert(!(stream->flags & UV_HANDLE_CLOSING));
  1025. if (stream->connect_req) {
  1026. uv__stream_connect(stream);
  1027. return;
  1028. }
  1029. assert(uv__stream_fd(stream) >= 0);
  1030. /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
  1031. if (events & (POLLIN | POLLERR | POLLHUP))
  1032. uv__read(stream);
  1033. if (uv__stream_fd(stream) == -1)
  1034. return; /* read_cb closed stream. */
  1035. /* Short-circuit iff POLLHUP is set, the user is still interested in read
  1036. * events and uv__read() reported a partial read but not EOF. If the EOF
  1037. * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
  1038. * have to do anything. If the partial read flag is not set, we can't
  1039. * report the EOF yet because there is still data to read.
  1040. */
  1041. if ((events & POLLHUP) &&
  1042. (stream->flags & UV_HANDLE_READING) &&
  1043. (stream->flags & UV_HANDLE_READ_PARTIAL) &&
  1044. !(stream->flags & UV_HANDLE_READ_EOF)) {
  1045. uv_buf_t buf = { NULL, 0 };
  1046. uv__stream_eof(stream, &buf);
  1047. }
  1048. if (uv__stream_fd(stream) == -1)
  1049. return; /* read_cb closed stream. */
  1050. if (events & (POLLOUT | POLLERR | POLLHUP)) {
  1051. uv__write(stream);
  1052. uv__write_callbacks(stream);
  1053. /* Write queue drained. */
  1054. if (QUEUE_EMPTY(&stream->write_queue))
  1055. uv__drain(stream);
  1056. }
  1057. }
  1058. /**
  1059. * We get called here from directly following a call to connect(2).
  1060. * In order to determine if we've errored out or succeeded must call
  1061. * getsockopt.
  1062. */
  1063. static void uv__stream_connect(uv_stream_t* stream) {
  1064. int error;
  1065. uv_connect_t* req = stream->connect_req;
  1066. socklen_t errorsize = sizeof(int);
  1067. assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
  1068. assert(req);
  1069. if (stream->delayed_error) {
  1070. /* To smooth over the differences between unixes errors that
  1071. * were reported synchronously on the first connect can be delayed
  1072. * until the next tick--which is now.
  1073. */
  1074. error = stream->delayed_error;
  1075. stream->delayed_error = 0;
  1076. } else {
  1077. /* Normal situation: we need to get the socket error from the kernel. */
  1078. assert(uv__stream_fd(stream) >= 0);
  1079. getsockopt(uv__stream_fd(stream),
  1080. SOL_SOCKET,
  1081. SO_ERROR,
  1082. &error,
  1083. &errorsize);
  1084. error = UV__ERR(error);
  1085. }
  1086. if (error == UV__ERR(EINPROGRESS))
  1087. return;
  1088. stream->connect_req = NULL;
  1089. uv__req_unregister(stream->loop, req);
  1090. if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
  1091. uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  1092. }
  1093. if (req->cb)
  1094. req->cb(req, error);
  1095. if (uv__stream_fd(stream) == -1)
  1096. return;
  1097. if (error < 0) {
  1098. uv__stream_flush_write_queue(stream, UV_ECANCELED);
  1099. uv__write_callbacks(stream);
  1100. }
  1101. }
  1102. int uv_write2(uv_write_t* req,
  1103. uv_stream_t* stream,
  1104. const uv_buf_t bufs[],
  1105. unsigned int nbufs,
  1106. uv_stream_t* send_handle,
  1107. uv_write_cb cb) {
  1108. int empty_queue;
  1109. assert(nbufs > 0);
  1110. assert((stream->type == UV_TCP ||
  1111. stream->type == UV_NAMED_PIPE ||
  1112. stream->type == UV_TTY) &&
  1113. "uv_write (unix) does not yet support other types of streams");
  1114. if (uv__stream_fd(stream) < 0)
  1115. return UV_EBADF;
  1116. if (!(stream->flags & UV_HANDLE_WRITABLE))
  1117. return -EPIPE;
  1118. if (send_handle) {
  1119. if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
  1120. return UV_EINVAL;
  1121. /* XXX We abuse uv_write2() to send over UDP handles to child processes.
  1122. * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
  1123. * evaluates to a function that operates on a uv_stream_t with a couple of
  1124. * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
  1125. * which works but only by accident.
  1126. */
  1127. if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
  1128. return UV_EBADF;
  1129. #if defined(__CYGWIN__) || defined(__MSYS__)
  1130. /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
  1131. See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
  1132. return UV_ENOSYS;
  1133. #endif
  1134. }
  1135. /* It's legal for write_queue_size > 0 even when the write_queue is empty;
  1136. * it means there are error-state requests in the write_completed_queue that
  1137. * will touch up write_queue_size later, see also uv__write_req_finish().
  1138. * We could check that write_queue is empty instead but that implies making
  1139. * a write() syscall when we know that the handle is in error mode.
  1140. */
  1141. empty_queue = (stream->write_queue_size == 0);
  1142. /* Initialize the req */
  1143. uv__req_init(stream->loop, req, UV_WRITE);
  1144. req->cb = cb;
  1145. req->handle = stream;
  1146. req->error = 0;
  1147. req->send_handle = send_handle;
  1148. QUEUE_INIT(&req->queue);
  1149. req->bufs = req->bufsml;
  1150. if (nbufs > ARRAY_SIZE(req->bufsml))
  1151. req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
  1152. if (req->bufs == NULL)
  1153. return UV_ENOMEM;
  1154. memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
  1155. req->nbufs = nbufs;
  1156. req->write_index = 0;
  1157. stream->write_queue_size += uv__count_bufs(bufs, nbufs);
  1158. /* Append the request to write_queue. */
  1159. QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
  1160. /* If the queue was empty when this function began, we should attempt to
  1161. * do the write immediately. Otherwise start the write_watcher and wait
  1162. * for the fd to become writable.
  1163. */
  1164. if (stream->connect_req) {
  1165. /* Still connecting, do nothing. */
  1166. }
  1167. else if (empty_queue) {
  1168. uv__write(stream);
  1169. }
  1170. else {
  1171. /*
  1172. * blocking streams should never have anything in the queue.
  1173. * if this assert fires then somehow the blocking stream isn't being
  1174. * sufficiently flushed in uv__write.
  1175. */
  1176. assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
  1177. uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
  1178. uv__stream_osx_interrupt_select(stream);
  1179. }
  1180. return 0;
  1181. }
  1182. /* The buffers to be written must remain valid until the callback is called.
  1183. * This is not required for the uv_buf_t array.
  1184. */
  1185. int uv_write(uv_write_t* req,
  1186. uv_stream_t* handle,
  1187. const uv_buf_t bufs[],
  1188. unsigned int nbufs,
  1189. uv_write_cb cb) {
  1190. return uv_write2(req, handle, bufs, nbufs, NULL, cb);
  1191. }
  1192. void uv_try_write_cb(uv_write_t* req, int status) {
  1193. /* Should not be called */
  1194. abort();
  1195. }
  1196. int uv_try_write(uv_stream_t* stream,
  1197. const uv_buf_t bufs[],
  1198. unsigned int nbufs) {
  1199. int r;
  1200. int has_pollout;
  1201. size_t written;
  1202. size_t req_size;
  1203. uv_write_t req;
  1204. /* Connecting or already writing some data */
  1205. if (stream->connect_req != NULL || stream->write_queue_size != 0)
  1206. return UV_EAGAIN;
  1207. has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
  1208. r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
  1209. if (r != 0)
  1210. return r;
  1211. /* Remove not written bytes from write queue size */
  1212. written = uv__count_bufs(bufs, nbufs);
  1213. if (req.bufs != NULL)
  1214. req_size = uv__write_req_size(&req);
  1215. else
  1216. req_size = 0;
  1217. written -= req_size;
  1218. stream->write_queue_size -= req_size;
  1219. /* Unqueue request, regardless of immediateness */
  1220. QUEUE_REMOVE(&req.queue);
  1221. uv__req_unregister(stream->loop, &req);
  1222. if (req.bufs != req.bufsml)
  1223. uv__free(req.bufs);
  1224. req.bufs = NULL;
  1225. /* Do not poll for writable, if we wasn't before calling this */
  1226. if (!has_pollout) {
  1227. uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  1228. uv__stream_osx_interrupt_select(stream);
  1229. }
  1230. if (written == 0 && req_size != 0)
  1231. return req.error < 0 ? req.error : UV_EAGAIN;
  1232. else
  1233. return written;
  1234. }
  1235. int uv_read_start(uv_stream_t* stream,
  1236. uv_alloc_cb alloc_cb,
  1237. uv_read_cb read_cb) {
  1238. assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
  1239. stream->type == UV_TTY);
  1240. if (stream->flags & UV_HANDLE_CLOSING)
  1241. return UV_EINVAL;
  1242. if (!(stream->flags & UV_HANDLE_READABLE))
  1243. return -ENOTCONN;
  1244. /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
  1245. * expresses the desired state of the user.
  1246. */
  1247. stream->flags |= UV_HANDLE_READING;
  1248. /* TODO: try to do the read inline? */
  1249. /* TODO: keep track of tcp state. If we've gotten a EOF then we should
  1250. * not start the IO watcher.
  1251. */
  1252. assert(uv__stream_fd(stream) >= 0);
  1253. assert(alloc_cb);
  1254. stream->read_cb = read_cb;
  1255. stream->alloc_cb = alloc_cb;
  1256. uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
  1257. uv__handle_start(stream);
  1258. uv__stream_osx_interrupt_select(stream);
  1259. return 0;
  1260. }
  1261. int uv_read_stop(uv_stream_t* stream) {
  1262. if (!(stream->flags & UV_HANDLE_READING))
  1263. return 0;
  1264. stream->flags &= ~UV_HANDLE_READING;
  1265. uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
  1266. if (!uv__io_active(&stream->io_watcher, POLLOUT))
  1267. uv__handle_stop(stream);
  1268. uv__stream_osx_interrupt_select(stream);
  1269. stream->read_cb = NULL;
  1270. stream->alloc_cb = NULL;
  1271. return 0;
  1272. }
  1273. int uv_is_readable(const uv_stream_t* stream) {
  1274. return !!(stream->flags & UV_HANDLE_READABLE);
  1275. }
  1276. int uv_is_writable(const uv_stream_t* stream) {
  1277. return !!(stream->flags & UV_HANDLE_WRITABLE);
  1278. }
  1279. #if defined(__APPLE__)
  1280. int uv___stream_fd(const uv_stream_t* handle) {
  1281. const uv__stream_select_t* s;
  1282. assert(handle->type == UV_TCP ||
  1283. handle->type == UV_TTY ||
  1284. handle->type == UV_NAMED_PIPE);
  1285. s = handle->select;
  1286. if (s != NULL)
  1287. return s->fd;
  1288. return handle->io_watcher.fd;
  1289. }
  1290. #endif /* defined(__APPLE__) */
  1291. void uv__stream_close(uv_stream_t* handle) {
  1292. unsigned int i;
  1293. uv__stream_queued_fds_t* queued_fds;
  1294. #if defined(__APPLE__)
  1295. /* Terminate select loop first */
  1296. if (handle->select != NULL) {
  1297. uv__stream_select_t* s;
  1298. s = handle->select;
  1299. uv_sem_post(&s->close_sem);
  1300. uv_sem_post(&s->async_sem);
  1301. uv__stream_osx_interrupt_select(handle);
  1302. uv_thread_join(&s->thread);
  1303. uv_sem_destroy(&s->close_sem);
  1304. uv_sem_destroy(&s->async_sem);
  1305. uv__close(s->fake_fd);
  1306. uv__close(s->int_fd);
  1307. uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
  1308. handle->select = NULL;
  1309. }
  1310. #endif /* defined(__APPLE__) */
  1311. uv__io_close(handle->loop, &handle->io_watcher);
  1312. uv_read_stop(handle);
  1313. uv__handle_stop(handle);
  1314. handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
  1315. if (handle->io_watcher.fd != -1) {
  1316. /* Don't close stdio file descriptors. Nothing good comes from it. */
  1317. if (handle->io_watcher.fd > STDERR_FILENO)
  1318. uv__close(handle->io_watcher.fd);
  1319. handle->io_watcher.fd = -1;
  1320. }
  1321. if (handle->accepted_fd != -1) {
  1322. uv__close(handle->accepted_fd);
  1323. handle->accepted_fd = -1;
  1324. }
  1325. /* Close all queued fds */
  1326. if (handle->queued_fds != NULL) {
  1327. queued_fds = handle->queued_fds;
  1328. for (i = 0; i < queued_fds->offset; i++)
  1329. uv__close(queued_fds->fds[i]);
  1330. uv__free(handle->queued_fds);
  1331. handle->queued_fds = NULL;
  1332. }
  1333. assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
  1334. }
  1335. int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
  1336. /* Don't need to check the file descriptor, uv__nonblock()
  1337. * will fail with EBADF if it's not valid.
  1338. */
  1339. return uv__nonblock(uv__stream_fd(handle), !blocking);
  1340. }