|
|
@@ -17,7 +17,7 @@
|
|
|
|
|
|
#include <obs.h>
|
|
|
#include <obs-avc.h>
|
|
|
-#include <util/darray.h>
|
|
|
+#include <util/circlebuf.h>
|
|
|
#include <util/dstr.h>
|
|
|
#include <util/threading.h>
|
|
|
#include "librtmp/rtmp.h"
|
|
|
@@ -25,23 +25,24 @@
|
|
|
#include "flv-mux.h"
|
|
|
|
|
|
struct rtmp_stream {
|
|
|
- obs_output_t output;
|
|
|
+ obs_output_t output;
|
|
|
|
|
|
- pthread_mutex_t packets_mutex;
|
|
|
- DARRAY(struct encoder_packet) packets;
|
|
|
+ pthread_mutex_t packets_mutex;
|
|
|
+ struct circlebuf packets;
|
|
|
|
|
|
- bool connecting;
|
|
|
- bool active;
|
|
|
- pthread_t connect_thread;
|
|
|
- pthread_t send_thread;
|
|
|
+ bool connecting;
|
|
|
+ pthread_t connect_thread;
|
|
|
|
|
|
- os_sem_t send_sem;
|
|
|
- os_event_t stop_event;
|
|
|
+ bool active;
|
|
|
+ pthread_t send_thread;
|
|
|
|
|
|
- struct dstr path, key;
|
|
|
- struct dstr username, password;
|
|
|
+ os_sem_t send_sem;
|
|
|
+ os_event_t stop_event;
|
|
|
|
|
|
- RTMP rtmp;
|
|
|
+ struct dstr path, key;
|
|
|
+ struct dstr username, password;
|
|
|
+
|
|
|
+ RTMP rtmp;
|
|
|
};
|
|
|
|
|
|
static const char *rtmp_stream_getname(const char *locale)
|
|
|
@@ -58,11 +59,25 @@ static void log_rtmp(int level, const char *format, va_list args)
|
|
|
UNUSED_PARAMETER(level);
|
|
|
}
|
|
|
|
|
|
+static inline void free_packets(struct rtmp_stream *stream)
|
|
|
+{
|
|
|
+ while (stream->packets.size) {
|
|
|
+ struct encoder_packet packet;
|
|
|
+ circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
|
|
|
+ obs_free_encoder_packet(&packet);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static void rtmp_stream_stop(void *data);
|
|
|
+
|
|
|
static void rtmp_stream_destroy(void *data)
|
|
|
{
|
|
|
struct rtmp_stream *stream = data;
|
|
|
|
|
|
if (stream) {
|
|
|
+ rtmp_stream_stop(stream);
|
|
|
+
|
|
|
+ free_packets(stream);
|
|
|
dstr_free(&stream->path);
|
|
|
dstr_free(&stream->key);
|
|
|
dstr_free(&stream->username);
|
|
|
@@ -99,13 +114,21 @@ fail:
|
|
|
|
|
|
static void rtmp_stream_stop(void *data)
|
|
|
{
|
|
|
- UNUSED_PARAMETER(data);
|
|
|
-}
|
|
|
+ struct rtmp_stream *stream = data;
|
|
|
+ void *ret;
|
|
|
|
|
|
-static void rtmp_stream_update(void *data, obs_data_t settings)
|
|
|
-{
|
|
|
- UNUSED_PARAMETER(data);
|
|
|
- UNUSED_PARAMETER(settings);
|
|
|
+ os_event_signal(stream->stop_event);
|
|
|
+
|
|
|
+ if (stream->connecting)
|
|
|
+ pthread_join(stream->connect_thread, &ret);
|
|
|
+
|
|
|
+ if (stream->active) {
|
|
|
+ obs_output_end_data_capture(stream->output);
|
|
|
+ os_sem_post(stream->send_sem);
|
|
|
+ pthread_join(stream->send_thread, &ret);
|
|
|
+ }
|
|
|
+
|
|
|
+ os_event_reset(stream->stop_event);
|
|
|
}
|
|
|
|
|
|
static inline void set_rtmp_str(AVal *val, const char *str)
|
|
|
@@ -128,44 +151,70 @@ static inline bool get_next_packet(struct rtmp_stream *stream,
|
|
|
bool new_packet = false;
|
|
|
|
|
|
pthread_mutex_lock(&stream->packets_mutex);
|
|
|
- if (stream->packets.num) {
|
|
|
- *packet = stream->packets.array[0];
|
|
|
+ if (stream->packets.size) {
|
|
|
+ circlebuf_pop_front(&stream->packets, packet,
|
|
|
+ sizeof(struct encoder_packet));
|
|
|
new_packet = true;
|
|
|
- da_erase(stream->packets, 0);
|
|
|
}
|
|
|
pthread_mutex_unlock(&stream->packets_mutex);
|
|
|
|
|
|
return new_packet;
|
|
|
}
|
|
|
|
|
|
-static void send_packet(struct rtmp_stream *stream,
|
|
|
+static int send_packet(struct rtmp_stream *stream,
|
|
|
struct encoder_packet *packet, bool is_header)
|
|
|
{
|
|
|
uint8_t *data;
|
|
|
size_t size;
|
|
|
+ int ret;
|
|
|
|
|
|
flv_packet_mux(packet, &data, &size, is_header);
|
|
|
- RTMP_Write(&stream->rtmp, (char*)data, (int)size);
|
|
|
+ ret = RTMP_Write(&stream->rtmp, (char*)data, (int)size);
|
|
|
bfree(data);
|
|
|
+
|
|
|
+ obs_free_encoder_packet(packet);
|
|
|
+ return ret;
|
|
|
+}
|
|
|
+
|
|
|
+static bool send_remaining_packets(struct rtmp_stream *stream)
|
|
|
+{
|
|
|
+ struct encoder_packet packet;
|
|
|
+
|
|
|
+ while (get_next_packet(stream, &packet))
|
|
|
+ if (send_packet(stream, &packet, false) < 0)
|
|
|
+ return false;
|
|
|
+
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
static void *send_thread(void *data)
|
|
|
{
|
|
|
struct rtmp_stream *stream = data;
|
|
|
+ bool disconnected = false;
|
|
|
|
|
|
while (os_sem_wait(stream->send_sem) == 0) {
|
|
|
struct encoder_packet packet;
|
|
|
|
|
|
if (os_event_try(stream->stop_event) != EAGAIN)
|
|
|
break;
|
|
|
-
|
|
|
if (!get_next_packet(stream, &packet))
|
|
|
continue;
|
|
|
-
|
|
|
- send_packet(stream, &packet, false);
|
|
|
- obs_free_encoder_packet(&packet);
|
|
|
+ if (send_packet(stream, &packet, false) < 0) {
|
|
|
+ disconnected = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ if (!disconnected && !send_remaining_packets(stream))
|
|
|
+ disconnected = true;
|
|
|
+
|
|
|
+ if (disconnected)
|
|
|
+ free_packets(stream);
|
|
|
+
|
|
|
+ if (os_event_try(stream->stop_event) == EAGAIN)
|
|
|
+ pthread_detach(stream->send_thread);
|
|
|
+
|
|
|
+ stream->active = false;
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
@@ -173,8 +222,8 @@ static void *send_thread(void *data)
|
|
|
|
|
|
static void send_meta_data(struct rtmp_stream *stream)
|
|
|
{
|
|
|
- uint8_t *meta_data;
|
|
|
- size_t meta_data_size;
|
|
|
+ uint8_t *meta_data;
|
|
|
+ size_t meta_data_size;
|
|
|
|
|
|
flv_meta_data(stream->output, &meta_data, &meta_data_size);
|
|
|
RTMP_Write(&stream->rtmp, (char*)meta_data, (int)meta_data_size);
|
|
|
@@ -183,8 +232,8 @@ static void send_meta_data(struct rtmp_stream *stream)
|
|
|
|
|
|
static void send_audio_header(struct rtmp_stream *stream)
|
|
|
{
|
|
|
- obs_output_t context = stream->output;
|
|
|
- obs_encoder_t aencoder = obs_output_get_audio_encoder(context);
|
|
|
+ obs_output_t context = stream->output;
|
|
|
+ obs_encoder_t aencoder = obs_output_get_audio_encoder(context);
|
|
|
|
|
|
struct encoder_packet packet = {
|
|
|
.type = OBS_ENCODER_AUDIO,
|
|
|
@@ -197,10 +246,10 @@ static void send_audio_header(struct rtmp_stream *stream)
|
|
|
|
|
|
static void send_video_header(struct rtmp_stream *stream)
|
|
|
{
|
|
|
- obs_output_t context = stream->output;
|
|
|
- obs_encoder_t vencoder = obs_output_get_video_encoder(context);
|
|
|
- uint8_t *header;
|
|
|
- size_t size;
|
|
|
+ obs_output_t context = stream->output;
|
|
|
+ obs_encoder_t vencoder = obs_output_get_video_encoder(context);
|
|
|
+ uint8_t *header;
|
|
|
+ size_t size;
|
|
|
|
|
|
struct encoder_packet packet = {
|
|
|
.type = OBS_ENCODER_VIDEO,
|
|
|
@@ -208,7 +257,7 @@ static void send_video_header(struct rtmp_stream *stream)
|
|
|
};
|
|
|
|
|
|
obs_encoder_get_extra_data(vencoder, &header, &size);
|
|
|
- packet.size = obs_create_avc_header(&packet.data, header, size);
|
|
|
+ packet.size = obs_parse_avc_header(&packet.data, header, size);
|
|
|
send_packet(stream, &packet, true);
|
|
|
obs_free_encoder_packet(&packet);
|
|
|
}
|
|
|
@@ -230,11 +279,11 @@ static inline bool reset_semaphore(struct rtmp_stream *stream)
|
|
|
#define socklen_t int
|
|
|
#endif
|
|
|
|
|
|
-static void init_send(struct rtmp_stream *stream)
|
|
|
+static int init_send(struct rtmp_stream *stream)
|
|
|
{
|
|
|
int cur_sendbuf_size = MIN_SENDBUF_SIZE;
|
|
|
socklen_t size = sizeof(int);
|
|
|
- socklen_t ret;
|
|
|
+ int ret;
|
|
|
|
|
|
getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF,
|
|
|
(char*)&cur_sendbuf_size, &size);
|
|
|
@@ -248,11 +297,14 @@ static void init_send(struct rtmp_stream *stream)
|
|
|
reset_semaphore(stream);
|
|
|
|
|
|
ret = pthread_create(&stream->send_thread, NULL, send_thread, stream);
|
|
|
- if (ret != 0)
|
|
|
- bcrash("Failed to create send thread");
|
|
|
+ if (ret != 0) {
|
|
|
+ RTMP_Close(&stream->rtmp);
|
|
|
+ return OBS_OUTPUT_FAIL;
|
|
|
+ }
|
|
|
|
|
|
send_headers(stream);
|
|
|
obs_output_begin_data_capture(stream->output, 0);
|
|
|
+ return OBS_OUTPUT_SUCCESS;
|
|
|
}
|
|
|
|
|
|
static int try_connect(struct rtmp_stream *stream)
|
|
|
@@ -276,13 +328,21 @@ static int try_connect(struct rtmp_stream *stream)
|
|
|
if (!RTMP_ConnectStream(&stream->rtmp, 0))
|
|
|
return OBS_OUTPUT_INVALID_STREAM;
|
|
|
|
|
|
- init_send(stream);
|
|
|
- return OBS_OUTPUT_SUCCESS;
|
|
|
+ return init_send(stream);
|
|
|
}
|
|
|
|
|
|
static void *connect_thread(void *data)
|
|
|
{
|
|
|
- UNUSED_PARAMETER(data);
|
|
|
+ struct rtmp_stream *stream = data;
|
|
|
+ int ret = try_connect(stream);
|
|
|
+
|
|
|
+ if (ret != OBS_OUTPUT_SUCCESS)
|
|
|
+ obs_output_signal_start_fail(stream->output, ret);
|
|
|
+
|
|
|
+ if (os_event_try(stream->stop_event) == EAGAIN)
|
|
|
+ pthread_detach(stream->connect_thread);
|
|
|
+
|
|
|
+ stream->connecting = false;
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
@@ -305,8 +365,46 @@ static bool rtmp_stream_start(void *data)
|
|
|
stream) != 0;
|
|
|
}
|
|
|
|
|
|
-static bool rtmp_stream_active(void *data)
|
|
|
+static void rtmp_stream_data(void *data, struct encoder_packet *packet)
|
|
|
{
|
|
|
- UNUSED_PARAMETER(data);
|
|
|
- return false;
|
|
|
+ struct rtmp_stream *stream = data;
|
|
|
+ struct encoder_packet new_packet;
|
|
|
+
|
|
|
+ if (packet->type == OBS_ENCODER_AUDIO)
|
|
|
+ obs_duplicate_encoder_packet(&new_packet, packet);
|
|
|
+ else if (packet->type == OBS_ENCODER_VIDEO)
|
|
|
+ obs_parse_avc_packet(&new_packet, packet);
|
|
|
+
|
|
|
+ pthread_mutex_lock(&stream->packets_mutex);
|
|
|
+ circlebuf_push_back(&stream->packets, &new_packet, sizeof(new_packet));
|
|
|
+ pthread_mutex_unlock(&stream->packets_mutex);
|
|
|
+ os_sem_post(stream->send_sem);
|
|
|
+}
|
|
|
+
|
|
|
+static obs_properties_t rtmp_stream_properties(const char *locale)
|
|
|
+{
|
|
|
+ obs_properties_t props = obs_properties_create();
|
|
|
+
|
|
|
+ /* TODO: locale */
|
|
|
+ obs_properties_add_text(props, "path", "Stream URL", OBS_TEXT_DEFAULT);
|
|
|
+ obs_properties_add_text(props, "key", "Stream Key", OBS_TEXT_PASSWORD);
|
|
|
+ obs_properties_add_text(props, "username", "User Name",
|
|
|
+ OBS_TEXT_DEFAULT);
|
|
|
+ obs_properties_add_text(props, "password", "Password",
|
|
|
+ OBS_TEXT_PASSWORD);
|
|
|
+
|
|
|
+ UNUSED_PARAMETER(locale);
|
|
|
+ return props;
|
|
|
}
|
|
|
+
|
|
|
+struct obs_output_info rtmp_output_info = {
|
|
|
+ .id = "rtmp_output",
|
|
|
+ .flags = OBS_OUTPUT_AV | OBS_OUTPUT_ENCODED | OBS_OUTPUT_SERVICE,
|
|
|
+ .getname = rtmp_stream_getname,
|
|
|
+ .create = rtmp_stream_create,
|
|
|
+ .destroy = rtmp_stream_destroy,
|
|
|
+ .start = rtmp_stream_start,
|
|
|
+ .stop = rtmp_stream_stop,
|
|
|
+ .encoded_data = rtmp_stream_data,
|
|
|
+ .properties = rtmp_stream_properties
|
|
|
+};
|