rtmp-windows.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. #ifdef _WIN32
  2. #include "rtmp-stream.h"
  3. static void fatal_sock_shutdown(struct rtmp_stream *stream)
  4. {
  5. closesocket(stream->rtmp.m_sb.sb_socket);
  6. stream->rtmp.m_sb.sb_socket = -1;
  7. stream->write_buf_len = 0;
  8. os_event_signal(stream->buffer_space_available_event);
  9. }
  10. static bool socket_event(struct rtmp_stream *stream, bool *can_write, uint64_t last_send_time)
  11. {
  12. WSANETWORKEVENTS net_events;
  13. bool success;
  14. success = !WSAEnumNetworkEvents(stream->rtmp.m_sb.sb_socket, NULL, &net_events);
  15. if (!success) {
  16. blog(LOG_ERROR,
  17. "socket_thread_windows: Aborting due to "
  18. "WSAEnumNetworkEvents failure, %d",
  19. WSAGetLastError());
  20. fatal_sock_shutdown(stream);
  21. return false;
  22. }
  23. if (net_events.lNetworkEvents & FD_WRITE)
  24. *can_write = true;
  25. if (net_events.lNetworkEvents & FD_CLOSE) {
  26. if (last_send_time) {
  27. uint32_t diff = (os_gettime_ns() / 1000000) - last_send_time;
  28. blog(LOG_ERROR,
  29. "socket_thread_windows: Received "
  30. "FD_CLOSE, %u ms since last send "
  31. "(buffer: %d / %d)",
  32. diff, stream->write_buf_len, stream->write_buf_size);
  33. }
  34. if (os_event_try(stream->stop_event) != EAGAIN)
  35. blog(LOG_ERROR,
  36. "socket_thread_windows: Aborting due "
  37. "to FD_CLOSE during shutdown, "
  38. "%d bytes lost, error %d",
  39. stream->write_buf_len, net_events.iErrorCode[FD_CLOSE_BIT]);
  40. else
  41. blog(LOG_ERROR,
  42. "socket_thread_windows: Aborting due "
  43. "to FD_CLOSE, error %d",
  44. net_events.iErrorCode[FD_CLOSE_BIT]);
  45. fatal_sock_shutdown(stream);
  46. return false;
  47. }
  48. if (net_events.lNetworkEvents & FD_READ) {
  49. char discard[16384];
  50. int err_code;
  51. bool fatal = false;
  52. for (;;) {
  53. int ret = recv(stream->rtmp.m_sb.sb_socket, discard, sizeof(discard), 0);
  54. if (ret == -1) {
  55. err_code = WSAGetLastError();
  56. if (err_code == WSAEWOULDBLOCK)
  57. break;
  58. fatal = true;
  59. } else if (ret == 0) {
  60. err_code = 0;
  61. fatal = true;
  62. }
  63. if (fatal) {
  64. blog(LOG_ERROR,
  65. "socket_thread_windows: "
  66. "Socket error, recv() returned "
  67. "%d, GetLastError() %d",
  68. ret, err_code);
  69. stream->rtmp.last_error_code = err_code;
  70. fatal_sock_shutdown(stream);
  71. return false;
  72. }
  73. }
  74. }
  75. return true;
  76. }
  77. static void ideal_send_backlog_event(struct rtmp_stream *stream, bool *can_write)
  78. {
  79. ULONG ideal_send_backlog;
  80. int ret;
  81. ret = idealsendbacklogquery(stream->rtmp.m_sb.sb_socket, &ideal_send_backlog);
  82. if (ret == 0) {
  83. int cur_tcp_bufsize;
  84. int size = sizeof(cur_tcp_bufsize);
  85. ret = getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (char *)&cur_tcp_bufsize, &size);
  86. if (ret == 0) {
  87. if (cur_tcp_bufsize < (int)ideal_send_backlog) {
  88. int bufsize = (int)ideal_send_backlog;
  89. setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (const char *)&bufsize,
  90. sizeof(bufsize));
  91. blog(LOG_INFO,
  92. "socket_thread_windows: "
  93. "Increasing send buffer to "
  94. "ISB %d (buffer: %d / %d)",
  95. ideal_send_backlog, stream->write_buf_len, stream->write_buf_size);
  96. }
  97. } else {
  98. blog(LOG_ERROR,
  99. "socket_thread_windows: Got "
  100. "send_backlog_event but "
  101. "getsockopt() returned %d",
  102. WSAGetLastError());
  103. }
  104. } else {
  105. blog(LOG_ERROR,
  106. "socket_thread_windows: Got "
  107. "send_backlog_event but WSAIoctl() "
  108. "returned %d",
  109. WSAGetLastError());
  110. }
  111. }
  112. enum data_ret { RET_BREAK, RET_FATAL, RET_CONTINUE };
  113. static enum data_ret write_data(struct rtmp_stream *stream, bool *can_write, uint64_t *last_send_time,
  114. size_t latency_packet_size, int delay_time)
  115. {
  116. bool exit_loop = false;
  117. pthread_mutex_lock(&stream->write_buf_mutex);
  118. if (!stream->write_buf_len) {
  119. /* this is now an expected occasional condition due to use of
  120. * auto-reset events, we could end up emptying the buffer as
  121. * it's filled in a previous loop cycle, especially if using
  122. * low latency mode. */
  123. pthread_mutex_unlock(&stream->write_buf_mutex);
  124. /* blog(LOG_DEBUG, "socket_thread_windows: Trying to send, "
  125. "but no data available"); */
  126. return RET_BREAK;
  127. }
  128. int ret;
  129. if (stream->low_latency_mode) {
  130. size_t send_len = min(latency_packet_size, stream->write_buf_len);
  131. ret = RTMPSockBuf_Send(&stream->rtmp.m_sb, (const char *)stream->write_buf, (int)send_len);
  132. } else {
  133. ret = RTMPSockBuf_Send(&stream->rtmp.m_sb, (const char *)stream->write_buf, (int)stream->write_buf_len);
  134. }
  135. if (ret > 0) {
  136. if (stream->write_buf_len - ret)
  137. memmove(stream->write_buf, stream->write_buf + ret, stream->write_buf_len - ret);
  138. stream->write_buf_len -= ret;
  139. *last_send_time = os_gettime_ns() / 1000000;
  140. os_event_signal(stream->buffer_space_available_event);
  141. } else {
  142. int err_code;
  143. bool fatal_err = false;
  144. if (ret == -1) {
  145. err_code = WSAGetLastError();
  146. if (err_code == WSAEWOULDBLOCK) {
  147. *can_write = false;
  148. pthread_mutex_unlock(&stream->write_buf_mutex);
  149. return RET_BREAK;
  150. }
  151. fatal_err = true;
  152. } else if (ret == 0) {
  153. err_code = 0;
  154. fatal_err = true;
  155. }
  156. if (fatal_err) {
  157. /* connection closed, or connection was aborted /
  158. * socket closed / etc, that's a fatal error. */
  159. blog(LOG_ERROR,
  160. "socket_thread_windows: "
  161. "Socket error, send() returned %d, "
  162. "GetLastError() %d",
  163. ret, err_code);
  164. pthread_mutex_unlock(&stream->write_buf_mutex);
  165. stream->rtmp.last_error_code = err_code;
  166. fatal_sock_shutdown(stream);
  167. return RET_FATAL;
  168. }
  169. }
  170. /* finish writing for now */
  171. if (stream->write_buf_len <= 1000)
  172. exit_loop = true;
  173. pthread_mutex_unlock(&stream->write_buf_mutex);
  174. if (delay_time)
  175. os_sleep_ms(delay_time);
  176. return exit_loop ? RET_BREAK : RET_CONTINUE;
  177. }
  178. #define LATENCY_FACTOR 20
  179. static inline void socket_thread_windows_internal(struct rtmp_stream *stream)
  180. {
  181. bool can_write = false;
  182. int delay_time;
  183. size_t latency_packet_size;
  184. uint64_t last_send_time = 0;
  185. HANDLE send_backlog_event;
  186. OVERLAPPED send_backlog_overlapped;
  187. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  188. WSAEventSelect(stream->rtmp.m_sb.sb_socket, stream->socket_available_event, FD_READ | FD_WRITE | FD_CLOSE);
  189. send_backlog_event = CreateEvent(NULL, true, false, NULL);
  190. if (stream->low_latency_mode) {
  191. delay_time = 1000 / LATENCY_FACTOR;
  192. latency_packet_size = stream->write_buf_size / (LATENCY_FACTOR - 2);
  193. } else {
  194. latency_packet_size = stream->write_buf_size;
  195. delay_time = 0;
  196. }
  197. if (!stream->disable_send_window_optimization) {
  198. memset(&send_backlog_overlapped, 0, sizeof(send_backlog_overlapped));
  199. send_backlog_overlapped.hEvent = send_backlog_event;
  200. idealsendbacklognotify(stream->rtmp.m_sb.sb_socket, &send_backlog_overlapped, NULL);
  201. } else {
  202. blog(LOG_INFO, "socket_thread_windows: Send window "
  203. "optimization disabled by user.");
  204. }
  205. HANDLE objs[3];
  206. objs[0] = stream->socket_available_event;
  207. objs[1] = stream->buffer_has_data_event;
  208. objs[2] = send_backlog_event;
  209. for (;;) {
  210. if (os_event_try(stream->send_thread_signaled_exit) != EAGAIN) {
  211. pthread_mutex_lock(&stream->write_buf_mutex);
  212. if (stream->write_buf_len == 0) {
  213. //blog(LOG_DEBUG, "Exiting on empty buffer");
  214. pthread_mutex_unlock(&stream->write_buf_mutex);
  215. os_event_reset(stream->send_thread_signaled_exit);
  216. break;
  217. }
  218. pthread_mutex_unlock(&stream->write_buf_mutex);
  219. }
  220. int status = WaitForMultipleObjects(3, objs, false, INFINITE);
  221. if (status == WAIT_ABANDONED || status == WAIT_FAILED) {
  222. blog(LOG_ERROR, "socket_thread_windows: Aborting due "
  223. "to WaitForMultipleObjects failure");
  224. fatal_sock_shutdown(stream);
  225. return;
  226. }
  227. if (status == WAIT_OBJECT_0) {
  228. /* Socket event */
  229. if (!socket_event(stream, &can_write, last_send_time))
  230. return;
  231. } else if (status == WAIT_OBJECT_0 + 2) {
  232. /* Ideal send backlog event */
  233. ideal_send_backlog_event(stream, &can_write);
  234. ResetEvent(send_backlog_event);
  235. idealsendbacklognotify(stream->rtmp.m_sb.sb_socket, &send_backlog_overlapped, NULL);
  236. continue;
  237. }
  238. if (can_write) {
  239. for (;;) {
  240. enum data_ret ret = write_data(stream, &can_write, &last_send_time, latency_packet_size,
  241. delay_time);
  242. switch (ret) {
  243. case RET_BREAK:
  244. goto exit_write_loop;
  245. case RET_FATAL:
  246. return;
  247. case RET_CONTINUE:;
  248. }
  249. }
  250. }
  251. exit_write_loop:;
  252. }
  253. if (stream->rtmp.m_sb.sb_socket != INVALID_SOCKET)
  254. WSAEventSelect(stream->rtmp.m_sb.sb_socket, stream->socket_available_event, 0);
  255. blog(LOG_INFO, "socket_thread_windows: Normal exit");
  256. }
  257. void *socket_thread_windows(void *data)
  258. {
  259. struct rtmp_stream *stream = data;
  260. socket_thread_windows_internal(stream);
  261. return NULL;
  262. }
  263. #endif