rtmp-windows.c 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. #ifdef _WIN32
  2. #include "rtmp-stream.h"
  3. #include <winsock2.h>
  4. static void fatal_sock_shutdown(struct rtmp_stream *stream)
  5. {
  6. closesocket(stream->rtmp.m_sb.sb_socket);
  7. stream->rtmp.m_sb.sb_socket = -1;
  8. stream->write_buf_len = 0;
  9. }
  10. static bool socket_event(struct rtmp_stream *stream, bool *can_write,
  11. uint64_t last_send_time)
  12. {
  13. WSANETWORKEVENTS net_events;
  14. bool success;
  15. success = !!WSAEnumNetworkEvents(stream->rtmp.m_sb.sb_socket, NULL,
  16. &net_events);
  17. if (success) {
  18. blog(LOG_ERROR, "socket_thread_windows: Aborting due to "
  19. "WSAEnumNetworkEvents failure, %d",
  20. WSAGetLastError());
  21. fatal_sock_shutdown(stream);
  22. return false;
  23. }
  24. if (net_events.lNetworkEvents & FD_WRITE)
  25. *can_write = true;
  26. if (net_events.lNetworkEvents & FD_CLOSE) {
  27. if (last_send_time) {
  28. uint32_t diff =
  29. (os_gettime_ns() / 1000000) - last_send_time;
  30. blog(LOG_ERROR, "socket_thread_windows: Received "
  31. "FD_CLOSE, %u ms since last send "
  32. "(buffer: %d / %d)",
  33. diff,
  34. stream->write_buf_len,
  35. stream->write_buf_size);
  36. }
  37. if (os_event_try(stream->stop_event) != EAGAIN)
  38. blog(LOG_ERROR, "socket_thread_windows: Aborting due "
  39. "to FD_CLOSE during shutdown, "
  40. "%d bytes lost, error %d",
  41. stream->write_buf_len,
  42. net_events.iErrorCode[FD_CLOSE_BIT]);
  43. else
  44. blog(LOG_ERROR, "socket_thread_windows: Aborting due "
  45. "to FD_CLOSE, error %d",
  46. net_events.iErrorCode[FD_CLOSE_BIT]);
  47. fatal_sock_shutdown(stream);
  48. return false;
  49. }
  50. if (net_events.lNetworkEvents & FD_READ) {
  51. char discard[16384];
  52. int err_code;
  53. bool fatal = false;
  54. for (;;) {
  55. int ret = recv(stream->rtmp.m_sb.sb_socket,
  56. discard, sizeof(discard), 0);
  57. if (ret == -1) {
  58. err_code = WSAGetLastError();
  59. if (err_code == WSAEWOULDBLOCK)
  60. break;
  61. fatal = true;
  62. } else if (ret == 0) {
  63. err_code = 0;
  64. fatal = true;
  65. }
  66. if (fatal) {
  67. blog(LOG_ERROR, "socket_thread_windows: "
  68. "Socket error, recv() returned "
  69. "%d, GetLastError() %d",
  70. ret, err_code);
  71. fatal_sock_shutdown(stream);
  72. return false;
  73. }
  74. }
  75. }
  76. return true;
  77. }
  78. static void ideal_send_backlog_event(struct rtmp_stream *stream,
  79. bool *can_write)
  80. {
  81. ULONG ideal_send_backlog;
  82. int ret;
  83. ret = idealsendbacklogquery(
  84. stream->rtmp.m_sb.sb_socket,
  85. &ideal_send_backlog);
  86. if (ret == 0) {
  87. int cur_tcp_bufsize;
  88. int size = sizeof(cur_tcp_bufsize);
  89. ret = getsockopt(stream->rtmp.m_sb.sb_socket,
  90. SOL_SOCKET,
  91. SO_SNDBUF,
  92. (char *)&cur_tcp_bufsize,
  93. &size);
  94. if (ret == 0) {
  95. if (cur_tcp_bufsize < (int)ideal_send_backlog) {
  96. int bufsize = (int)ideal_send_backlog;
  97. setsockopt(stream->rtmp.m_sb.sb_socket,
  98. SOL_SOCKET,
  99. SO_SNDBUF,
  100. (const char *)&bufsize,
  101. sizeof(bufsize));
  102. blog(LOG_INFO, "socket_thread_windows: "
  103. "Increasing send buffer to "
  104. "ISB %d (buffer: %d / %d)",
  105. ideal_send_backlog,
  106. stream->write_buf_len,
  107. stream->write_buf_size);
  108. }
  109. } else {
  110. blog(LOG_ERROR, "socket_thread_windows: Got "
  111. "send_backlog_event but "
  112. "getsockopt() returned %d",
  113. WSAGetLastError());
  114. }
  115. } else {
  116. blog(LOG_ERROR, "socket_thread_windows: Got "
  117. "send_backlog_event but WSAIoctl() "
  118. "returned %d",
  119. WSAGetLastError());
  120. }
  121. }
  122. enum data_ret {
  123. RET_BREAK,
  124. RET_FATAL,
  125. RET_CONTINUE
  126. };
  127. static enum data_ret write_data(struct rtmp_stream *stream, bool *can_write,
  128. uint64_t *last_send_time, size_t latency_packet_size,
  129. int delay_time)
  130. {
  131. bool exit_loop = false;
  132. pthread_mutex_lock(&stream->write_buf_mutex);
  133. if (!stream->write_buf_len) {
  134. /* this is now an expected occasional condition due to use of
  135. * auto-reset events, we could end up emptying the buffer as
  136. * it's filled in a previous loop cycle, especially if using
  137. * low latency mode. */
  138. pthread_mutex_unlock(&stream->write_buf_mutex);
  139. /* blog(LOG_DEBUG, "socket_thread_windows: Trying to send, "
  140. "but no data available"); */
  141. return RET_BREAK;
  142. }
  143. int ret;
  144. if (stream->low_latency_mode) {
  145. size_t send_len =
  146. min(latency_packet_size, stream->write_buf_len);
  147. ret = send(stream->rtmp.m_sb.sb_socket,
  148. (const char *)stream->write_buf,
  149. (int)send_len, 0);
  150. } else {
  151. ret = send(stream->rtmp.m_sb.sb_socket,
  152. (const char *)stream->write_buf,
  153. (int)stream->write_buf_len, 0);
  154. }
  155. if (ret > 0) {
  156. if (stream->write_buf_len - ret)
  157. memmove(stream->write_buf,
  158. stream->write_buf + ret,
  159. stream->write_buf_len - ret);
  160. stream->write_buf_len -= ret;
  161. *last_send_time = os_gettime_ns() / 1000000;
  162. os_event_signal(stream->buffer_space_available_event);
  163. } else {
  164. int err_code;
  165. bool fatal_err = false;
  166. if (ret == -1) {
  167. err_code = WSAGetLastError();
  168. if (err_code == WSAEWOULDBLOCK) {
  169. *can_write = false;
  170. pthread_mutex_unlock(&stream->write_buf_mutex);
  171. return RET_BREAK;
  172. }
  173. fatal_err = true;
  174. } else if (ret == 0) {
  175. err_code = 0;
  176. fatal_err = true;
  177. }
  178. if (fatal_err) {
  179. /* connection closed, or connection was aborted /
  180. * socket closed / etc, that's a fatal error. */
  181. blog(LOG_ERROR, "socket_thread_windows: "
  182. "Socket error, send() returned %d, "
  183. "GetLastError() %d",
  184. ret, err_code);
  185. pthread_mutex_unlock(&stream->write_buf_mutex);
  186. fatal_sock_shutdown(stream);
  187. return RET_FATAL;
  188. }
  189. }
  190. /* finish writing for now */
  191. if (stream->write_buf_len <= 1000)
  192. exit_loop = true;
  193. pthread_mutex_unlock(&stream->write_buf_mutex);
  194. if (delay_time)
  195. os_sleep_ms(delay_time);
  196. return exit_loop ? RET_BREAK : RET_CONTINUE;
  197. }
  198. static inline void socket_thread_windows_internal(struct rtmp_stream *stream)
  199. {
  200. bool can_write = false;
  201. int delay_time;
  202. size_t latency_packet_size;
  203. uint64_t last_send_time = 0;
  204. HANDLE send_backlog_event;
  205. OVERLAPPED send_backlog_overlapped;
  206. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  207. WSAEventSelect(stream->rtmp.m_sb.sb_socket,
  208. stream->socket_available_event,
  209. FD_READ|FD_WRITE|FD_CLOSE);
  210. send_backlog_event = CreateEvent(NULL, true, false, NULL);
  211. if (stream->low_latency_mode) {
  212. delay_time = 1400.0f / (stream->write_buf_size / 1000.0f);
  213. latency_packet_size = 1460;
  214. } else {
  215. latency_packet_size = stream->write_buf_size;
  216. delay_time = 0;
  217. }
  218. if (!stream->disable_send_window_optimization) {
  219. memset(&send_backlog_overlapped, 0,
  220. sizeof(send_backlog_overlapped));
  221. send_backlog_overlapped.hEvent = send_backlog_event;
  222. idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
  223. &send_backlog_overlapped, NULL);
  224. } else {
  225. blog(LOG_INFO, "socket_thread_windows: Send window "
  226. "optimization disabled by user.");
  227. }
  228. HANDLE objs[3];
  229. objs[0] = stream->socket_available_event;
  230. objs[1] = stream->buffer_has_data_event;
  231. objs[2] = send_backlog_event;
  232. for (;;) {
  233. if (os_event_try(stream->stop_event) != EAGAIN) {
  234. pthread_mutex_lock(&stream->write_buf_mutex);
  235. if (stream->write_buf_len == 0) {
  236. //blog(LOG_DEBUG, "Exiting on empty buffer");
  237. pthread_mutex_unlock(&stream->write_buf_mutex);
  238. break;
  239. }
  240. pthread_mutex_unlock(&stream->write_buf_mutex);
  241. }
  242. int status = WaitForMultipleObjects(3, objs, false, INFINITE);
  243. if (status == WAIT_ABANDONED || status == WAIT_FAILED) {
  244. blog(LOG_ERROR, "socket_thread_windows: Aborting due "
  245. "to WaitForMultipleObjects failure");
  246. fatal_sock_shutdown(stream);
  247. return;
  248. }
  249. if (status == WAIT_OBJECT_0) {
  250. /* Socket event */
  251. if (!socket_event(stream, &can_write, last_send_time))
  252. return;
  253. } else if (status == WAIT_OBJECT_0 + 2) {
  254. /* Ideal send backlog event */
  255. ideal_send_backlog_event(stream, &can_write);
  256. ResetEvent(send_backlog_event);
  257. idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
  258. &send_backlog_overlapped, NULL);
  259. continue;
  260. }
  261. if (can_write) {
  262. for (;;) {
  263. enum data_ret ret = write_data(
  264. stream,
  265. &can_write,
  266. &last_send_time,
  267. latency_packet_size,
  268. delay_time);
  269. switch (ret) {
  270. case RET_BREAK:
  271. break;
  272. case RET_FATAL:
  273. return;
  274. case RET_CONTINUE:;
  275. }
  276. }
  277. }
  278. }
  279. blog(LOG_INFO, "socket_thread_windows: Normal exit");
  280. }
  281. void *socket_thread_windows(void *data)
  282. {
  283. struct rtmp_stream *stream = data;
  284. socket_thread_windows_internal(stream);
  285. return NULL;
  286. }
  287. #endif