rtmp-windows.c 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. #ifdef _WIN32
  2. #include "rtmp-stream.h"
  3. #include <winsock2.h>
  4. static void fatal_sock_shutdown(struct rtmp_stream *stream)
  5. {
  6. closesocket(stream->rtmp.m_sb.sb_socket);
  7. stream->rtmp.m_sb.sb_socket = -1;
  8. stream->write_buf_len = 0;
  9. os_event_signal(stream->buffer_space_available_event);
  10. }
  11. static bool socket_event(struct rtmp_stream *stream, bool *can_write,
  12. uint64_t last_send_time)
  13. {
  14. WSANETWORKEVENTS net_events;
  15. bool success;
  16. success = !WSAEnumNetworkEvents(stream->rtmp.m_sb.sb_socket, NULL,
  17. &net_events);
  18. if (!success) {
  19. blog(LOG_ERROR,
  20. "socket_thread_windows: Aborting due to "
  21. "WSAEnumNetworkEvents failure, %d",
  22. WSAGetLastError());
  23. fatal_sock_shutdown(stream);
  24. return false;
  25. }
  26. if (net_events.lNetworkEvents & FD_WRITE)
  27. *can_write = true;
  28. if (net_events.lNetworkEvents & FD_CLOSE) {
  29. if (last_send_time) {
  30. uint32_t diff =
  31. (os_gettime_ns() / 1000000) - last_send_time;
  32. blog(LOG_ERROR,
  33. "socket_thread_windows: Received "
  34. "FD_CLOSE, %u ms since last send "
  35. "(buffer: %d / %d)",
  36. diff, stream->write_buf_len,
  37. stream->write_buf_size);
  38. }
  39. if (os_event_try(stream->stop_event) != EAGAIN)
  40. blog(LOG_ERROR,
  41. "socket_thread_windows: Aborting due "
  42. "to FD_CLOSE during shutdown, "
  43. "%d bytes lost, error %d",
  44. stream->write_buf_len,
  45. net_events.iErrorCode[FD_CLOSE_BIT]);
  46. else
  47. blog(LOG_ERROR,
  48. "socket_thread_windows: Aborting due "
  49. "to FD_CLOSE, error %d",
  50. net_events.iErrorCode[FD_CLOSE_BIT]);
  51. fatal_sock_shutdown(stream);
  52. return false;
  53. }
  54. if (net_events.lNetworkEvents & FD_READ) {
  55. char discard[16384];
  56. int err_code;
  57. bool fatal = false;
  58. for (;;) {
  59. int ret = recv(stream->rtmp.m_sb.sb_socket, discard,
  60. sizeof(discard), 0);
  61. if (ret == -1) {
  62. err_code = WSAGetLastError();
  63. if (err_code == WSAEWOULDBLOCK)
  64. break;
  65. fatal = true;
  66. } else if (ret == 0) {
  67. err_code = 0;
  68. fatal = true;
  69. }
  70. if (fatal) {
  71. blog(LOG_ERROR,
  72. "socket_thread_windows: "
  73. "Socket error, recv() returned "
  74. "%d, GetLastError() %d",
  75. ret, err_code);
  76. stream->rtmp.last_error_code = err_code;
  77. fatal_sock_shutdown(stream);
  78. return false;
  79. }
  80. }
  81. }
  82. return true;
  83. }
  84. static void ideal_send_backlog_event(struct rtmp_stream *stream,
  85. bool *can_write)
  86. {
  87. ULONG ideal_send_backlog;
  88. int ret;
  89. ret = idealsendbacklogquery(stream->rtmp.m_sb.sb_socket,
  90. &ideal_send_backlog);
  91. if (ret == 0) {
  92. int cur_tcp_bufsize;
  93. int size = sizeof(cur_tcp_bufsize);
  94. ret = getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
  95. SO_SNDBUF, (char *)&cur_tcp_bufsize, &size);
  96. if (ret == 0) {
  97. if (cur_tcp_bufsize < (int)ideal_send_backlog) {
  98. int bufsize = (int)ideal_send_backlog;
  99. setsockopt(stream->rtmp.m_sb.sb_socket,
  100. SOL_SOCKET, SO_SNDBUF,
  101. (const char *)&bufsize,
  102. sizeof(bufsize));
  103. blog(LOG_INFO,
  104. "socket_thread_windows: "
  105. "Increasing send buffer to "
  106. "ISB %d (buffer: %d / %d)",
  107. ideal_send_backlog, stream->write_buf_len,
  108. stream->write_buf_size);
  109. }
  110. } else {
  111. blog(LOG_ERROR,
  112. "socket_thread_windows: Got "
  113. "send_backlog_event but "
  114. "getsockopt() returned %d",
  115. WSAGetLastError());
  116. }
  117. } else {
  118. blog(LOG_ERROR,
  119. "socket_thread_windows: Got "
  120. "send_backlog_event but WSAIoctl() "
  121. "returned %d",
  122. WSAGetLastError());
  123. }
  124. }
  125. enum data_ret { RET_BREAK, RET_FATAL, RET_CONTINUE };
  126. static enum data_ret write_data(struct rtmp_stream *stream, bool *can_write,
  127. uint64_t *last_send_time,
  128. size_t latency_packet_size, int delay_time)
  129. {
  130. bool exit_loop = false;
  131. pthread_mutex_lock(&stream->write_buf_mutex);
  132. if (!stream->write_buf_len) {
  133. /* this is now an expected occasional condition due to use of
  134. * auto-reset events, we could end up emptying the buffer as
  135. * it's filled in a previous loop cycle, especially if using
  136. * low latency mode. */
  137. pthread_mutex_unlock(&stream->write_buf_mutex);
  138. /* blog(LOG_DEBUG, "socket_thread_windows: Trying to send, "
  139. "but no data available"); */
  140. return RET_BREAK;
  141. }
  142. int ret;
  143. if (stream->low_latency_mode) {
  144. size_t send_len =
  145. min(latency_packet_size, stream->write_buf_len);
  146. ret = RTMPSockBuf_Send(&stream->rtmp.m_sb,
  147. (const char *)stream->write_buf,
  148. (int)send_len);
  149. } else {
  150. ret = RTMPSockBuf_Send(&stream->rtmp.m_sb,
  151. (const char *)stream->write_buf,
  152. (int)stream->write_buf_len);
  153. }
  154. if (ret > 0) {
  155. if (stream->write_buf_len - ret)
  156. memmove(stream->write_buf, stream->write_buf + ret,
  157. stream->write_buf_len - ret);
  158. stream->write_buf_len -= ret;
  159. *last_send_time = os_gettime_ns() / 1000000;
  160. os_event_signal(stream->buffer_space_available_event);
  161. } else {
  162. int err_code;
  163. bool fatal_err = false;
  164. if (ret == -1) {
  165. err_code = WSAGetLastError();
  166. if (err_code == WSAEWOULDBLOCK) {
  167. *can_write = false;
  168. pthread_mutex_unlock(&stream->write_buf_mutex);
  169. return RET_BREAK;
  170. }
  171. fatal_err = true;
  172. } else if (ret == 0) {
  173. err_code = 0;
  174. fatal_err = true;
  175. }
  176. if (fatal_err) {
  177. /* connection closed, or connection was aborted /
  178. * socket closed / etc, that's a fatal error. */
  179. blog(LOG_ERROR,
  180. "socket_thread_windows: "
  181. "Socket error, send() returned %d, "
  182. "GetLastError() %d",
  183. ret, err_code);
  184. pthread_mutex_unlock(&stream->write_buf_mutex);
  185. stream->rtmp.last_error_code = err_code;
  186. fatal_sock_shutdown(stream);
  187. return RET_FATAL;
  188. }
  189. }
  190. /* finish writing for now */
  191. if (stream->write_buf_len <= 1000)
  192. exit_loop = true;
  193. pthread_mutex_unlock(&stream->write_buf_mutex);
  194. if (delay_time)
  195. os_sleep_ms(delay_time);
  196. return exit_loop ? RET_BREAK : RET_CONTINUE;
  197. }
  198. #define LATENCY_FACTOR 20
  199. static inline void socket_thread_windows_internal(struct rtmp_stream *stream)
  200. {
  201. bool can_write = false;
  202. int delay_time;
  203. size_t latency_packet_size;
  204. uint64_t last_send_time = 0;
  205. HANDLE send_backlog_event;
  206. OVERLAPPED send_backlog_overlapped;
  207. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  208. WSAEventSelect(stream->rtmp.m_sb.sb_socket,
  209. stream->socket_available_event,
  210. FD_READ | FD_WRITE | FD_CLOSE);
  211. send_backlog_event = CreateEvent(NULL, true, false, NULL);
  212. if (stream->low_latency_mode) {
  213. delay_time = 1000 / LATENCY_FACTOR;
  214. latency_packet_size =
  215. stream->write_buf_size / (LATENCY_FACTOR - 2);
  216. } else {
  217. latency_packet_size = stream->write_buf_size;
  218. delay_time = 0;
  219. }
  220. if (!stream->disable_send_window_optimization) {
  221. memset(&send_backlog_overlapped, 0,
  222. sizeof(send_backlog_overlapped));
  223. send_backlog_overlapped.hEvent = send_backlog_event;
  224. idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
  225. &send_backlog_overlapped, NULL);
  226. } else {
  227. blog(LOG_INFO, "socket_thread_windows: Send window "
  228. "optimization disabled by user.");
  229. }
  230. HANDLE objs[3];
  231. objs[0] = stream->socket_available_event;
  232. objs[1] = stream->buffer_has_data_event;
  233. objs[2] = send_backlog_event;
  234. for (;;) {
  235. if (os_event_try(stream->send_thread_signaled_exit) != EAGAIN) {
  236. pthread_mutex_lock(&stream->write_buf_mutex);
  237. if (stream->write_buf_len == 0) {
  238. //blog(LOG_DEBUG, "Exiting on empty buffer");
  239. pthread_mutex_unlock(&stream->write_buf_mutex);
  240. os_event_reset(
  241. stream->send_thread_signaled_exit);
  242. break;
  243. }
  244. pthread_mutex_unlock(&stream->write_buf_mutex);
  245. }
  246. int status = WaitForMultipleObjects(3, objs, false, INFINITE);
  247. if (status == WAIT_ABANDONED || status == WAIT_FAILED) {
  248. blog(LOG_ERROR, "socket_thread_windows: Aborting due "
  249. "to WaitForMultipleObjects failure");
  250. fatal_sock_shutdown(stream);
  251. return;
  252. }
  253. if (status == WAIT_OBJECT_0) {
  254. /* Socket event */
  255. if (!socket_event(stream, &can_write, last_send_time))
  256. return;
  257. } else if (status == WAIT_OBJECT_0 + 2) {
  258. /* Ideal send backlog event */
  259. ideal_send_backlog_event(stream, &can_write);
  260. ResetEvent(send_backlog_event);
  261. idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
  262. &send_backlog_overlapped, NULL);
  263. continue;
  264. }
  265. if (can_write) {
  266. for (;;) {
  267. enum data_ret ret = write_data(
  268. stream, &can_write, &last_send_time,
  269. latency_packet_size, delay_time);
  270. switch (ret) {
  271. case RET_BREAK:
  272. goto exit_write_loop;
  273. case RET_FATAL:
  274. return;
  275. case RET_CONTINUE:;
  276. }
  277. }
  278. }
  279. exit_write_loop:;
  280. }
  281. if (stream->rtmp.m_sb.sb_socket != INVALID_SOCKET)
  282. WSAEventSelect(stream->rtmp.m_sb.sb_socket,
  283. stream->socket_available_event, 0);
  284. blog(LOG_INFO, "socket_thread_windows: Normal exit");
  285. }
  286. void *socket_thread_windows(void *data)
  287. {
  288. struct rtmp_stream *stream = data;
  289. socket_thread_windows_internal(stream);
  290. return NULL;
  291. }
  292. #endif