123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- #ifdef _WIN32
- #include "rtmp-stream.h"
- #include <winsock2.h>
- static void fatal_sock_shutdown(struct rtmp_stream *stream)
- {
- closesocket(stream->rtmp.m_sb.sb_socket);
- stream->rtmp.m_sb.sb_socket = -1;
- stream->write_buf_len = 0;
- os_event_signal(stream->buffer_space_available_event);
- }
- static bool socket_event(struct rtmp_stream *stream, bool *can_write,
- uint64_t last_send_time)
- {
- WSANETWORKEVENTS net_events;
- bool success;
- success = !WSAEnumNetworkEvents(stream->rtmp.m_sb.sb_socket, NULL,
- &net_events);
- if (!success) {
- blog(LOG_ERROR,
- "socket_thread_windows: Aborting due to "
- "WSAEnumNetworkEvents failure, %d",
- WSAGetLastError());
- fatal_sock_shutdown(stream);
- return false;
- }
- if (net_events.lNetworkEvents & FD_WRITE)
- *can_write = true;
- if (net_events.lNetworkEvents & FD_CLOSE) {
- if (last_send_time) {
- uint32_t diff =
- (os_gettime_ns() / 1000000) - last_send_time;
- blog(LOG_ERROR,
- "socket_thread_windows: Received "
- "FD_CLOSE, %u ms since last send "
- "(buffer: %d / %d)",
- diff, stream->write_buf_len,
- stream->write_buf_size);
- }
- if (os_event_try(stream->stop_event) != EAGAIN)
- blog(LOG_ERROR,
- "socket_thread_windows: Aborting due "
- "to FD_CLOSE during shutdown, "
- "%d bytes lost, error %d",
- stream->write_buf_len,
- net_events.iErrorCode[FD_CLOSE_BIT]);
- else
- blog(LOG_ERROR,
- "socket_thread_windows: Aborting due "
- "to FD_CLOSE, error %d",
- net_events.iErrorCode[FD_CLOSE_BIT]);
- fatal_sock_shutdown(stream);
- return false;
- }
- if (net_events.lNetworkEvents & FD_READ) {
- char discard[16384];
- int err_code;
- bool fatal = false;
- for (;;) {
- int ret = recv(stream->rtmp.m_sb.sb_socket, discard,
- sizeof(discard), 0);
- if (ret == -1) {
- err_code = WSAGetLastError();
- if (err_code == WSAEWOULDBLOCK)
- break;
- fatal = true;
- } else if (ret == 0) {
- err_code = 0;
- fatal = true;
- }
- if (fatal) {
- blog(LOG_ERROR,
- "socket_thread_windows: "
- "Socket error, recv() returned "
- "%d, GetLastError() %d",
- ret, err_code);
- stream->rtmp.last_error_code = err_code;
- fatal_sock_shutdown(stream);
- return false;
- }
- }
- }
- return true;
- }
- static void ideal_send_backlog_event(struct rtmp_stream *stream,
- bool *can_write)
- {
- ULONG ideal_send_backlog;
- int ret;
- ret = idealsendbacklogquery(stream->rtmp.m_sb.sb_socket,
- &ideal_send_backlog);
- if (ret == 0) {
- int cur_tcp_bufsize;
- int size = sizeof(cur_tcp_bufsize);
- ret = getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
- SO_SNDBUF, (char *)&cur_tcp_bufsize, &size);
- if (ret == 0) {
- if (cur_tcp_bufsize < (int)ideal_send_backlog) {
- int bufsize = (int)ideal_send_backlog;
- setsockopt(stream->rtmp.m_sb.sb_socket,
- SOL_SOCKET, SO_SNDBUF,
- (const char *)&bufsize,
- sizeof(bufsize));
- blog(LOG_INFO,
- "socket_thread_windows: "
- "Increasing send buffer to "
- "ISB %d (buffer: %d / %d)",
- ideal_send_backlog, stream->write_buf_len,
- stream->write_buf_size);
- }
- } else {
- blog(LOG_ERROR,
- "socket_thread_windows: Got "
- "send_backlog_event but "
- "getsockopt() returned %d",
- WSAGetLastError());
- }
- } else {
- blog(LOG_ERROR,
- "socket_thread_windows: Got "
- "send_backlog_event but WSAIoctl() "
- "returned %d",
- WSAGetLastError());
- }
- }
- enum data_ret { RET_BREAK, RET_FATAL, RET_CONTINUE };
- static enum data_ret write_data(struct rtmp_stream *stream, bool *can_write,
- uint64_t *last_send_time,
- size_t latency_packet_size, int delay_time)
- {
- bool exit_loop = false;
- pthread_mutex_lock(&stream->write_buf_mutex);
- if (!stream->write_buf_len) {
- /* this is now an expected occasional condition due to use of
- * auto-reset events, we could end up emptying the buffer as
- * it's filled in a previous loop cycle, especially if using
- * low latency mode. */
- pthread_mutex_unlock(&stream->write_buf_mutex);
- /* blog(LOG_DEBUG, "socket_thread_windows: Trying to send, "
- "but no data available"); */
- return RET_BREAK;
- }
- int ret;
- if (stream->low_latency_mode) {
- size_t send_len =
- min(latency_packet_size, stream->write_buf_len);
- ret = RTMPSockBuf_Send(&stream->rtmp.m_sb,
- (const char *)stream->write_buf,
- (int)send_len);
- } else {
- ret = RTMPSockBuf_Send(&stream->rtmp.m_sb,
- (const char *)stream->write_buf,
- (int)stream->write_buf_len);
- }
- if (ret > 0) {
- if (stream->write_buf_len - ret)
- memmove(stream->write_buf, stream->write_buf + ret,
- stream->write_buf_len - ret);
- stream->write_buf_len -= ret;
- *last_send_time = os_gettime_ns() / 1000000;
- os_event_signal(stream->buffer_space_available_event);
- } else {
- int err_code;
- bool fatal_err = false;
- if (ret == -1) {
- err_code = WSAGetLastError();
- if (err_code == WSAEWOULDBLOCK) {
- *can_write = false;
- pthread_mutex_unlock(&stream->write_buf_mutex);
- return RET_BREAK;
- }
- fatal_err = true;
- } else if (ret == 0) {
- err_code = 0;
- fatal_err = true;
- }
- if (fatal_err) {
- /* connection closed, or connection was aborted /
- * socket closed / etc, that's a fatal error. */
- blog(LOG_ERROR,
- "socket_thread_windows: "
- "Socket error, send() returned %d, "
- "GetLastError() %d",
- ret, err_code);
- pthread_mutex_unlock(&stream->write_buf_mutex);
- stream->rtmp.last_error_code = err_code;
- fatal_sock_shutdown(stream);
- return RET_FATAL;
- }
- }
- /* finish writing for now */
- if (stream->write_buf_len <= 1000)
- exit_loop = true;
- pthread_mutex_unlock(&stream->write_buf_mutex);
- if (delay_time)
- os_sleep_ms(delay_time);
- return exit_loop ? RET_BREAK : RET_CONTINUE;
- }
- #define LATENCY_FACTOR 20
- static inline void socket_thread_windows_internal(struct rtmp_stream *stream)
- {
- bool can_write = false;
- int delay_time;
- size_t latency_packet_size;
- uint64_t last_send_time = 0;
- HANDLE send_backlog_event;
- OVERLAPPED send_backlog_overlapped;
- SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
- WSAEventSelect(stream->rtmp.m_sb.sb_socket,
- stream->socket_available_event,
- FD_READ | FD_WRITE | FD_CLOSE);
- send_backlog_event = CreateEvent(NULL, true, false, NULL);
- if (stream->low_latency_mode) {
- delay_time = 1000 / LATENCY_FACTOR;
- latency_packet_size =
- stream->write_buf_size / (LATENCY_FACTOR - 2);
- } else {
- latency_packet_size = stream->write_buf_size;
- delay_time = 0;
- }
- if (!stream->disable_send_window_optimization) {
- memset(&send_backlog_overlapped, 0,
- sizeof(send_backlog_overlapped));
- send_backlog_overlapped.hEvent = send_backlog_event;
- idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
- &send_backlog_overlapped, NULL);
- } else {
- blog(LOG_INFO, "socket_thread_windows: Send window "
- "optimization disabled by user.");
- }
- HANDLE objs[3];
- objs[0] = stream->socket_available_event;
- objs[1] = stream->buffer_has_data_event;
- objs[2] = send_backlog_event;
- for (;;) {
- if (os_event_try(stream->send_thread_signaled_exit) != EAGAIN) {
- pthread_mutex_lock(&stream->write_buf_mutex);
- if (stream->write_buf_len == 0) {
- //blog(LOG_DEBUG, "Exiting on empty buffer");
- pthread_mutex_unlock(&stream->write_buf_mutex);
- os_event_reset(
- stream->send_thread_signaled_exit);
- break;
- }
- pthread_mutex_unlock(&stream->write_buf_mutex);
- }
- int status = WaitForMultipleObjects(3, objs, false, INFINITE);
- if (status == WAIT_ABANDONED || status == WAIT_FAILED) {
- blog(LOG_ERROR, "socket_thread_windows: Aborting due "
- "to WaitForMultipleObjects failure");
- fatal_sock_shutdown(stream);
- return;
- }
- if (status == WAIT_OBJECT_0) {
- /* Socket event */
- if (!socket_event(stream, &can_write, last_send_time))
- return;
- } else if (status == WAIT_OBJECT_0 + 2) {
- /* Ideal send backlog event */
- ideal_send_backlog_event(stream, &can_write);
- ResetEvent(send_backlog_event);
- idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
- &send_backlog_overlapped, NULL);
- continue;
- }
- if (can_write) {
- for (;;) {
- enum data_ret ret = write_data(
- stream, &can_write, &last_send_time,
- latency_packet_size, delay_time);
- switch (ret) {
- case RET_BREAK:
- goto exit_write_loop;
- case RET_FATAL:
- return;
- case RET_CONTINUE:;
- }
- }
- }
- exit_write_loop:;
- }
- if (stream->rtmp.m_sb.sb_socket != INVALID_SOCKET)
- WSAEventSelect(stream->rtmp.m_sb.sb_socket,
- stream->socket_available_event, 0);
- blog(LOG_INFO, "socket_thread_windows: Normal exit");
- }
- void *socket_thread_windows(void *data)
- {
- struct rtmp_stream *stream = data;
- socket_thread_windows_internal(stream);
- return NULL;
- }
- #endif
|