rtmp-windows.c 8.5 KB


  1. #ifdef _WIN32
  2. #include "rtmp-stream.h"
  3. #include <winsock2.h>
  4. static void fatal_sock_shutdown(struct rtmp_stream *stream)
  5. {
  6. closesocket(stream->rtmp.m_sb.sb_socket);
  7. stream->rtmp.m_sb.sb_socket = -1;
  8. stream->write_buf_len = 0;
  9. os_event_signal(stream->buffer_space_available_event);
  10. }
  11. static bool socket_event(struct rtmp_stream *stream, bool *can_write,
  12. uint64_t last_send_time)
  13. {
  14. WSANETWORKEVENTS net_events;
  15. bool success;
  16. success = !WSAEnumNetworkEvents(stream->rtmp.m_sb.sb_socket, NULL,
  17. &net_events);
  18. if (!success) {
  19. blog(LOG_ERROR, "socket_thread_windows: Aborting due to "
  20. "WSAEnumNetworkEvents failure, %d",
  21. WSAGetLastError());
  22. fatal_sock_shutdown(stream);
  23. return false;
  24. }
  25. if (net_events.lNetworkEvents & FD_WRITE)
  26. *can_write = true;
  27. if (net_events.lNetworkEvents & FD_CLOSE) {
  28. if (last_send_time) {
  29. uint32_t diff =
  30. (os_gettime_ns() / 1000000) - last_send_time;
  31. blog(LOG_ERROR, "socket_thread_windows: Received "
  32. "FD_CLOSE, %u ms since last send "
  33. "(buffer: %d / %d)",
  34. diff,
  35. stream->write_buf_len,
  36. stream->write_buf_size);
  37. }
  38. if (os_event_try(stream->stop_event) != EAGAIN)
  39. blog(LOG_ERROR, "socket_thread_windows: Aborting due "
  40. "to FD_CLOSE during shutdown, "
  41. "%d bytes lost, error %d",
  42. stream->write_buf_len,
  43. net_events.iErrorCode[FD_CLOSE_BIT]);
  44. else
  45. blog(LOG_ERROR, "socket_thread_windows: Aborting due "
  46. "to FD_CLOSE, error %d",
  47. net_events.iErrorCode[FD_CLOSE_BIT]);
  48. fatal_sock_shutdown(stream);
  49. return false;
  50. }
  51. if (net_events.lNetworkEvents & FD_READ) {
  52. char discard[16384];
  53. int err_code;
  54. bool fatal = false;
  55. for (;;) {
  56. int ret = recv(stream->rtmp.m_sb.sb_socket,
  57. discard, sizeof(discard), 0);
  58. if (ret == -1) {
  59. err_code = WSAGetLastError();
  60. if (err_code == WSAEWOULDBLOCK)
  61. break;
  62. fatal = true;
  63. } else if (ret == 0) {
  64. err_code = 0;
  65. fatal = true;
  66. }
  67. if (fatal) {
  68. blog(LOG_ERROR, "socket_thread_windows: "
  69. "Socket error, recv() returned "
  70. "%d, GetLastError() %d",
  71. ret, err_code);
  72. stream->rtmp.last_error_code = err_code;
  73. fatal_sock_shutdown(stream);
  74. return false;
  75. }
  76. }
  77. }
  78. return true;
  79. }
  80. static void ideal_send_backlog_event(struct rtmp_stream *stream,
  81. bool *can_write)
  82. {
  83. ULONG ideal_send_backlog;
  84. int ret;
  85. ret = idealsendbacklogquery(
  86. stream->rtmp.m_sb.sb_socket,
  87. &ideal_send_backlog);
  88. if (ret == 0) {
  89. int cur_tcp_bufsize;
  90. int size = sizeof(cur_tcp_bufsize);
  91. ret = getsockopt(stream->rtmp.m_sb.sb_socket,
  92. SOL_SOCKET,
  93. SO_SNDBUF,
  94. (char *)&cur_tcp_bufsize,
  95. &size);
  96. if (ret == 0) {
  97. if (cur_tcp_bufsize < (int)ideal_send_backlog) {
  98. int bufsize = (int)ideal_send_backlog;
  99. setsockopt(stream->rtmp.m_sb.sb_socket,
  100. SOL_SOCKET,
  101. SO_SNDBUF,
  102. (const char *)&bufsize,
  103. sizeof(bufsize));
  104. blog(LOG_INFO, "socket_thread_windows: "
  105. "Increasing send buffer to "
  106. "ISB %d (buffer: %d / %d)",
  107. ideal_send_backlog,
  108. stream->write_buf_len,
  109. stream->write_buf_size);
  110. }
  111. } else {
  112. blog(LOG_ERROR, "socket_thread_windows: Got "
  113. "send_backlog_event but "
  114. "getsockopt() returned %d",
  115. WSAGetLastError());
  116. }
  117. } else {
  118. blog(LOG_ERROR, "socket_thread_windows: Got "
  119. "send_backlog_event but WSAIoctl() "
  120. "returned %d",
  121. WSAGetLastError());
  122. }
  123. }
  124. enum data_ret {
  125. RET_BREAK,
  126. RET_FATAL,
  127. RET_CONTINUE
  128. };
  129. static enum data_ret write_data(struct rtmp_stream *stream, bool *can_write,
  130. uint64_t *last_send_time, size_t latency_packet_size,
  131. int delay_time)
  132. {
  133. bool exit_loop = false;
  134. pthread_mutex_lock(&stream->write_buf_mutex);
  135. if (!stream->write_buf_len) {
  136. /* this is now an expected occasional condition due to use of
  137. * auto-reset events, we could end up emptying the buffer as
  138. * it's filled in a previous loop cycle, especially if using
  139. * low latency mode. */
  140. pthread_mutex_unlock(&stream->write_buf_mutex);
  141. /* blog(LOG_DEBUG, "socket_thread_windows: Trying to send, "
  142. "but no data available"); */
  143. return RET_BREAK;
  144. }
  145. int ret;
  146. if (stream->low_latency_mode) {
  147. size_t send_len =
  148. min(latency_packet_size, stream->write_buf_len);
  149. ret = RTMPSockBuf_Send(&stream->rtmp.m_sb,
  150. (const char *)stream->write_buf,
  151. (int)send_len);
  152. } else {
  153. ret = RTMPSockBuf_Send(&stream->rtmp.m_sb,
  154. (const char *)stream->write_buf,
  155. (int)stream->write_buf_len);
  156. }
  157. if (ret > 0) {
  158. if (stream->write_buf_len - ret)
  159. memmove(stream->write_buf,
  160. stream->write_buf + ret,
  161. stream->write_buf_len - ret);
  162. stream->write_buf_len -= ret;
  163. *last_send_time = os_gettime_ns() / 1000000;
  164. os_event_signal(stream->buffer_space_available_event);
  165. } else {
  166. int err_code;
  167. bool fatal_err = false;
  168. if (ret == -1) {
  169. err_code = WSAGetLastError();
  170. if (err_code == WSAEWOULDBLOCK) {
  171. *can_write = false;
  172. pthread_mutex_unlock(&stream->write_buf_mutex);
  173. return RET_BREAK;
  174. }
  175. fatal_err = true;
  176. } else if (ret == 0) {
  177. err_code = 0;
  178. fatal_err = true;
  179. }
  180. if (fatal_err) {
  181. /* connection closed, or connection was aborted /
  182. * socket closed / etc, that's a fatal error. */
  183. blog(LOG_ERROR, "socket_thread_windows: "
  184. "Socket error, send() returned %d, "
  185. "GetLastError() %d",
  186. ret, err_code);
  187. pthread_mutex_unlock(&stream->write_buf_mutex);
  188. stream->rtmp.last_error_code = err_code;
  189. fatal_sock_shutdown(stream);
  190. return RET_FATAL;
  191. }
  192. }
  193. /* finish writing for now */
  194. if (stream->write_buf_len <= 1000)
  195. exit_loop = true;
  196. pthread_mutex_unlock(&stream->write_buf_mutex);
  197. if (delay_time)
  198. os_sleep_ms(delay_time);
  199. return exit_loop ? RET_BREAK : RET_CONTINUE;
  200. }
  201. #define LATENCY_FACTOR 20
  202. static inline void socket_thread_windows_internal(struct rtmp_stream *stream)
  203. {
  204. bool can_write = false;
  205. int delay_time;
  206. size_t latency_packet_size;
  207. uint64_t last_send_time = 0;
  208. HANDLE send_backlog_event;
  209. OVERLAPPED send_backlog_overlapped;
  210. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  211. WSAEventSelect(stream->rtmp.m_sb.sb_socket,
  212. stream->socket_available_event,
  213. FD_READ|FD_WRITE|FD_CLOSE);
  214. send_backlog_event = CreateEvent(NULL, true, false, NULL);
  215. if (stream->low_latency_mode) {
  216. delay_time = 1000 / LATENCY_FACTOR;
  217. latency_packet_size = stream->write_buf_size / (LATENCY_FACTOR - 2);
  218. } else {
  219. latency_packet_size = stream->write_buf_size;
  220. delay_time = 0;
  221. }
  222. if (!stream->disable_send_window_optimization) {
  223. memset(&send_backlog_overlapped, 0,
  224. sizeof(send_backlog_overlapped));
  225. send_backlog_overlapped.hEvent = send_backlog_event;
  226. idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
  227. &send_backlog_overlapped, NULL);
  228. } else {
  229. blog(LOG_INFO, "socket_thread_windows: Send window "
  230. "optimization disabled by user.");
  231. }
  232. HANDLE objs[3];
  233. objs[0] = stream->socket_available_event;
  234. objs[1] = stream->buffer_has_data_event;
  235. objs[2] = send_backlog_event;
  236. for (;;) {
  237. if (os_event_try(stream->send_thread_signaled_exit) != EAGAIN) {
  238. pthread_mutex_lock(&stream->write_buf_mutex);
  239. if (stream->write_buf_len == 0) {
  240. //blog(LOG_DEBUG, "Exiting on empty buffer");
  241. pthread_mutex_unlock(&stream->write_buf_mutex);
  242. os_event_reset(stream->send_thread_signaled_exit);
  243. break;
  244. }
  245. pthread_mutex_unlock(&stream->write_buf_mutex);
  246. }
  247. int status = WaitForMultipleObjects(3, objs, false, INFINITE);
  248. if (status == WAIT_ABANDONED || status == WAIT_FAILED) {
  249. blog(LOG_ERROR, "socket_thread_windows: Aborting due "
  250. "to WaitForMultipleObjects failure");
  251. fatal_sock_shutdown(stream);
  252. return;
  253. }
  254. if (status == WAIT_OBJECT_0) {
  255. /* Socket event */
  256. if (!socket_event(stream, &can_write, last_send_time))
  257. return;
  258. } else if (status == WAIT_OBJECT_0 + 2) {
  259. /* Ideal send backlog event */
  260. ideal_send_backlog_event(stream, &can_write);
  261. ResetEvent(send_backlog_event);
  262. idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
  263. &send_backlog_overlapped, NULL);
  264. continue;
  265. }
  266. if (can_write) {
  267. for (;;) {
  268. enum data_ret ret = write_data(
  269. stream,
  270. &can_write,
  271. &last_send_time,
  272. latency_packet_size,
  273. delay_time);
  274. switch (ret) {
  275. case RET_BREAK:
  276. goto exit_write_loop;
  277. case RET_FATAL:
  278. return;
  279. case RET_CONTINUE:;
  280. }
  281. }
  282. }
  283. exit_write_loop:;
  284. }
  285. if (stream->rtmp.m_sb.sb_socket != INVALID_SOCKET)
  286. WSAEventSelect(stream->rtmp.m_sb.sb_socket,
  287. stream->socket_available_event, 0);
  288. blog(LOG_INFO, "socket_thread_windows: Normal exit");
  289. }
  290. void *socket_thread_windows(void *data)
  291. {
  292. struct rtmp_stream *stream = data;
  293. socket_thread_windows_internal(stream);
  294. return NULL;
  295. }
  296. #endif