rtmp-windows.c 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. #ifdef _WIN32
  2. #include "rtmp-stream.h"
  3. static void fatal_sock_shutdown(struct rtmp_stream *stream)
  4. {
  5. closesocket(stream->rtmp.m_sb.sb_socket);
  6. stream->rtmp.m_sb.sb_socket = -1;
  7. stream->write_buf_len = 0;
  8. os_event_signal(stream->buffer_space_available_event);
  9. }
  10. static bool socket_event(struct rtmp_stream *stream, bool *can_write,
  11. uint64_t last_send_time)
  12. {
  13. WSANETWORKEVENTS net_events;
  14. bool success;
  15. success = !WSAEnumNetworkEvents(stream->rtmp.m_sb.sb_socket, NULL,
  16. &net_events);
  17. if (!success) {
  18. blog(LOG_ERROR,
  19. "socket_thread_windows: Aborting due to "
  20. "WSAEnumNetworkEvents failure, %d",
  21. WSAGetLastError());
  22. fatal_sock_shutdown(stream);
  23. return false;
  24. }
  25. if (net_events.lNetworkEvents & FD_WRITE)
  26. *can_write = true;
  27. if (net_events.lNetworkEvents & FD_CLOSE) {
  28. if (last_send_time) {
  29. uint32_t diff =
  30. (os_gettime_ns() / 1000000) - last_send_time;
  31. blog(LOG_ERROR,
  32. "socket_thread_windows: Received "
  33. "FD_CLOSE, %u ms since last send "
  34. "(buffer: %d / %d)",
  35. diff, stream->write_buf_len,
  36. stream->write_buf_size);
  37. }
  38. if (os_event_try(stream->stop_event) != EAGAIN)
  39. blog(LOG_ERROR,
  40. "socket_thread_windows: Aborting due "
  41. "to FD_CLOSE during shutdown, "
  42. "%d bytes lost, error %d",
  43. stream->write_buf_len,
  44. net_events.iErrorCode[FD_CLOSE_BIT]);
  45. else
  46. blog(LOG_ERROR,
  47. "socket_thread_windows: Aborting due "
  48. "to FD_CLOSE, error %d",
  49. net_events.iErrorCode[FD_CLOSE_BIT]);
  50. fatal_sock_shutdown(stream);
  51. return false;
  52. }
  53. if (net_events.lNetworkEvents & FD_READ) {
  54. char discard[16384];
  55. int err_code;
  56. bool fatal = false;
  57. for (;;) {
  58. int ret = recv(stream->rtmp.m_sb.sb_socket, discard,
  59. sizeof(discard), 0);
  60. if (ret == -1) {
  61. err_code = WSAGetLastError();
  62. if (err_code == WSAEWOULDBLOCK)
  63. break;
  64. fatal = true;
  65. } else if (ret == 0) {
  66. err_code = 0;
  67. fatal = true;
  68. }
  69. if (fatal) {
  70. blog(LOG_ERROR,
  71. "socket_thread_windows: "
  72. "Socket error, recv() returned "
  73. "%d, GetLastError() %d",
  74. ret, err_code);
  75. stream->rtmp.last_error_code = err_code;
  76. fatal_sock_shutdown(stream);
  77. return false;
  78. }
  79. }
  80. }
  81. return true;
  82. }
  83. static void ideal_send_backlog_event(struct rtmp_stream *stream,
  84. bool *can_write)
  85. {
  86. ULONG ideal_send_backlog;
  87. int ret;
  88. ret = idealsendbacklogquery(stream->rtmp.m_sb.sb_socket,
  89. &ideal_send_backlog);
  90. if (ret == 0) {
  91. int cur_tcp_bufsize;
  92. int size = sizeof(cur_tcp_bufsize);
  93. ret = getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
  94. SO_SNDBUF, (char *)&cur_tcp_bufsize, &size);
  95. if (ret == 0) {
  96. if (cur_tcp_bufsize < (int)ideal_send_backlog) {
  97. int bufsize = (int)ideal_send_backlog;
  98. setsockopt(stream->rtmp.m_sb.sb_socket,
  99. SOL_SOCKET, SO_SNDBUF,
  100. (const char *)&bufsize,
  101. sizeof(bufsize));
  102. blog(LOG_INFO,
  103. "socket_thread_windows: "
  104. "Increasing send buffer to "
  105. "ISB %d (buffer: %d / %d)",
  106. ideal_send_backlog, stream->write_buf_len,
  107. stream->write_buf_size);
  108. }
  109. } else {
  110. blog(LOG_ERROR,
  111. "socket_thread_windows: Got "
  112. "send_backlog_event but "
  113. "getsockopt() returned %d",
  114. WSAGetLastError());
  115. }
  116. } else {
  117. blog(LOG_ERROR,
  118. "socket_thread_windows: Got "
  119. "send_backlog_event but WSAIoctl() "
  120. "returned %d",
  121. WSAGetLastError());
  122. }
  123. }
  124. enum data_ret { RET_BREAK, RET_FATAL, RET_CONTINUE };
  125. static enum data_ret write_data(struct rtmp_stream *stream, bool *can_write,
  126. uint64_t *last_send_time,
  127. size_t latency_packet_size, int delay_time)
  128. {
  129. bool exit_loop = false;
  130. pthread_mutex_lock(&stream->write_buf_mutex);
  131. if (!stream->write_buf_len) {
  132. /* this is now an expected occasional condition due to use of
  133. * auto-reset events, we could end up emptying the buffer as
  134. * it's filled in a previous loop cycle, especially if using
  135. * low latency mode. */
  136. pthread_mutex_unlock(&stream->write_buf_mutex);
  137. /* blog(LOG_DEBUG, "socket_thread_windows: Trying to send, "
  138. "but no data available"); */
  139. return RET_BREAK;
  140. }
  141. int ret;
  142. if (stream->low_latency_mode) {
  143. size_t send_len =
  144. min(latency_packet_size, stream->write_buf_len);
  145. ret = RTMPSockBuf_Send(&stream->rtmp.m_sb,
  146. (const char *)stream->write_buf,
  147. (int)send_len);
  148. } else {
  149. ret = RTMPSockBuf_Send(&stream->rtmp.m_sb,
  150. (const char *)stream->write_buf,
  151. (int)stream->write_buf_len);
  152. }
  153. if (ret > 0) {
  154. if (stream->write_buf_len - ret)
  155. memmove(stream->write_buf, stream->write_buf + ret,
  156. stream->write_buf_len - ret);
  157. stream->write_buf_len -= ret;
  158. *last_send_time = os_gettime_ns() / 1000000;
  159. os_event_signal(stream->buffer_space_available_event);
  160. } else {
  161. int err_code;
  162. bool fatal_err = false;
  163. if (ret == -1) {
  164. err_code = WSAGetLastError();
  165. if (err_code == WSAEWOULDBLOCK) {
  166. *can_write = false;
  167. pthread_mutex_unlock(&stream->write_buf_mutex);
  168. return RET_BREAK;
  169. }
  170. fatal_err = true;
  171. } else if (ret == 0) {
  172. err_code = 0;
  173. fatal_err = true;
  174. }
  175. if (fatal_err) {
  176. /* connection closed, or connection was aborted /
  177. * socket closed / etc, that's a fatal error. */
  178. blog(LOG_ERROR,
  179. "socket_thread_windows: "
  180. "Socket error, send() returned %d, "
  181. "GetLastError() %d",
  182. ret, err_code);
  183. pthread_mutex_unlock(&stream->write_buf_mutex);
  184. stream->rtmp.last_error_code = err_code;
  185. fatal_sock_shutdown(stream);
  186. return RET_FATAL;
  187. }
  188. }
  189. /* finish writing for now */
  190. if (stream->write_buf_len <= 1000)
  191. exit_loop = true;
  192. pthread_mutex_unlock(&stream->write_buf_mutex);
  193. if (delay_time)
  194. os_sleep_ms(delay_time);
  195. return exit_loop ? RET_BREAK : RET_CONTINUE;
  196. }
  197. #define LATENCY_FACTOR 20
  198. static inline void socket_thread_windows_internal(struct rtmp_stream *stream)
  199. {
  200. bool can_write = false;
  201. int delay_time;
  202. size_t latency_packet_size;
  203. uint64_t last_send_time = 0;
  204. HANDLE send_backlog_event;
  205. OVERLAPPED send_backlog_overlapped;
  206. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  207. WSAEventSelect(stream->rtmp.m_sb.sb_socket,
  208. stream->socket_available_event,
  209. FD_READ | FD_WRITE | FD_CLOSE);
  210. send_backlog_event = CreateEvent(NULL, true, false, NULL);
  211. if (stream->low_latency_mode) {
  212. delay_time = 1000 / LATENCY_FACTOR;
  213. latency_packet_size =
  214. stream->write_buf_size / (LATENCY_FACTOR - 2);
  215. } else {
  216. latency_packet_size = stream->write_buf_size;
  217. delay_time = 0;
  218. }
  219. if (!stream->disable_send_window_optimization) {
  220. memset(&send_backlog_overlapped, 0,
  221. sizeof(send_backlog_overlapped));
  222. send_backlog_overlapped.hEvent = send_backlog_event;
  223. idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
  224. &send_backlog_overlapped, NULL);
  225. } else {
  226. blog(LOG_INFO, "socket_thread_windows: Send window "
  227. "optimization disabled by user.");
  228. }
  229. HANDLE objs[3];
  230. objs[0] = stream->socket_available_event;
  231. objs[1] = stream->buffer_has_data_event;
  232. objs[2] = send_backlog_event;
  233. for (;;) {
  234. if (os_event_try(stream->send_thread_signaled_exit) != EAGAIN) {
  235. pthread_mutex_lock(&stream->write_buf_mutex);
  236. if (stream->write_buf_len == 0) {
  237. //blog(LOG_DEBUG, "Exiting on empty buffer");
  238. pthread_mutex_unlock(&stream->write_buf_mutex);
  239. os_event_reset(
  240. stream->send_thread_signaled_exit);
  241. break;
  242. }
  243. pthread_mutex_unlock(&stream->write_buf_mutex);
  244. }
  245. int status = WaitForMultipleObjects(3, objs, false, INFINITE);
  246. if (status == WAIT_ABANDONED || status == WAIT_FAILED) {
  247. blog(LOG_ERROR, "socket_thread_windows: Aborting due "
  248. "to WaitForMultipleObjects failure");
  249. fatal_sock_shutdown(stream);
  250. return;
  251. }
  252. if (status == WAIT_OBJECT_0) {
  253. /* Socket event */
  254. if (!socket_event(stream, &can_write, last_send_time))
  255. return;
  256. } else if (status == WAIT_OBJECT_0 + 2) {
  257. /* Ideal send backlog event */
  258. ideal_send_backlog_event(stream, &can_write);
  259. ResetEvent(send_backlog_event);
  260. idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
  261. &send_backlog_overlapped, NULL);
  262. continue;
  263. }
  264. if (can_write) {
  265. for (;;) {
  266. enum data_ret ret = write_data(
  267. stream, &can_write, &last_send_time,
  268. latency_packet_size, delay_time);
  269. switch (ret) {
  270. case RET_BREAK:
  271. goto exit_write_loop;
  272. case RET_FATAL:
  273. return;
  274. case RET_CONTINUE:;
  275. }
  276. }
  277. }
  278. exit_write_loop:;
  279. }
  280. if (stream->rtmp.m_sb.sb_socket != INVALID_SOCKET)
  281. WSAEventSelect(stream->rtmp.m_sb.sb_socket,
  282. stream->socket_available_event, 0);
  283. blog(LOG_INFO, "socket_thread_windows: Normal exit");
  284. }
  285. void *socket_thread_windows(void *data)
  286. {
  287. struct rtmp_stream *stream = data;
  288. socket_thread_windows_internal(stream);
  289. return NULL;
  290. }
  291. #endif