Ver código fonte

(API Change) libobs: Fix output data cutoff on stop

(Note: This commit also modifies obs-ffmpeg and obs-outputs)

API Changed:
obs_output_info::void (*stop)(void *data);

To:
obs_output_info::void (*stop)(void *data, uint64_t ts);

This fixes the long-time design flaw where obs_output_stop and the
output 'stop' callback would just shut down the output without
considering the timing of when obs_output_stop was used, discarding any
possible buffering and causing the output to get cut off at an
unexpected timing.

The 'stop' callback of obs_output_info now takes a timestamp with the
expectation that the output will use that timestamp to stop output data
in accordance to that timing.  obs_output_stop now records the timestamp
at the time that the function is called and calls the 'stop' callback
with that timestamp.  If needed, obs_output_force_stop will still stop
the output immediately without buffering.
jp9000 9 anos atrás
pai
commit
d7db0b8b01

+ 5 - 3
libobs/obs-internal.h

@@ -781,6 +781,7 @@ struct obs_output {
 
 	bool                            received_video;
 	bool                            received_audio;
+	volatile bool                   data_active;
 	volatile bool                   end_data_capture_thread_active;
 	int64_t                         video_offset;
 	int64_t                         audio_offsets[MAX_AUDIO_MIXES];
@@ -790,14 +791,15 @@ struct obs_output {
 	os_event_t                      *stopping_event;
 	pthread_mutex_t                 interleaved_mutex;
 	DARRAY(struct encoder_packet)   interleaved_packets;
+	int                             stop_code;
 
 	int                             reconnect_retry_sec;
 	int                             reconnect_retry_max;
 	int                             reconnect_retries;
 	int                             reconnect_retry_cur_sec;
-	volatile bool                   reconnecting;
 	pthread_t                       reconnect_thread;
 	os_event_t                      *reconnect_stop_event;
+	volatile bool                   reconnecting;
 	volatile bool                   reconnect_thread_active;
 
 	uint32_t                        starting_drawn_count;
@@ -808,7 +810,6 @@ struct obs_output {
 	int                             total_frames;
 
 	volatile bool                   active;
-	volatile bool                   stopped;
 	video_t                         *video;
 	audio_t                         *audio;
 	obs_encoder_t                   *video_encoder;
@@ -852,7 +853,8 @@ extern void obs_output_cleanup_delay(obs_output_t *output);
 extern bool obs_output_delay_start(obs_output_t *output);
 extern void obs_output_delay_stop(obs_output_t *output);
 extern bool obs_output_actual_start(obs_output_t *output);
-extern void obs_output_actual_stop(obs_output_t *output, bool force);
+extern void obs_output_actual_stop(obs_output_t *output, bool force,
+		uint64_t ts);
 
 extern const struct obs_output_info *find_output(const char *id);
 

+ 3 - 2
libobs/obs-output-delay.c

@@ -56,7 +56,7 @@ static inline void process_delay_data(struct obs_output *output,
 		obs_output_actual_start(output);
 		break;
 	case DELAY_MSG_STOP:
-		obs_output_actual_stop(output, false);
+		obs_output_actual_stop(output, false, dd->ts);
 		break;
 	}
 }
@@ -151,8 +151,9 @@ bool obs_output_delay_start(obs_output_t *output)
 	circlebuf_push_back(&output->delay_data, &dd, sizeof(dd));
 	pthread_mutex_unlock(&output->delay_mutex);
 
+	os_atomic_inc_long(&output->delay_restart_refs);
+
 	if (delay_active(output)) {
-		os_atomic_inc_long(&output->delay_restart_refs);
 		do_output_signal(output, "starting");
 		return true;
 	}

+ 124 - 49
libobs/obs-output.c

@@ -50,8 +50,6 @@ static inline bool data_capture_ending(const struct obs_output *output)
 	return os_atomic_load_bool(&output->end_data_capture_thread_active);
 }
 
-static inline void signal_stop(struct obs_output *output, int code);
-
 const struct obs_output_info *find_output(const char *id)
 {
 	size_t i;
@@ -171,7 +169,7 @@ void obs_output_destroy(obs_output_t *output)
 		blog(LOG_INFO, "output '%s' destroyed", output->context.name);
 
 		if (output->valid && active(output))
-			obs_output_actual_stop(output, true);
+			obs_output_actual_stop(output, true, 0);
 		if (output->service)
 			output->service->output = NULL;
 
@@ -218,6 +216,7 @@ bool obs_output_actual_start(obs_output_t *output)
 	bool success = false;
 
 	os_event_wait(output->stopping_event);
+	output->stop_code = 0;
 
 	if (output->context.data)
 		success = output->info.start(output->context.data);
@@ -259,6 +258,11 @@ bool obs_output_start(obs_output_t *output)
 	}
 }
 
+static inline bool data_active(struct obs_output *output)
+{
+	return os_atomic_load_bool(&output->data_active);
+}
+
 static void log_frame_info(struct obs_output *output)
 {
 	struct obs_core_video *video = &obs->video;
@@ -309,30 +313,47 @@ static void log_frame_info(struct obs_output *output)
 				dropped, percentage_dropped);
 }
 
-void obs_output_actual_stop(obs_output_t *output, bool force)
+static inline void signal_stop(struct obs_output *output);
+
+void obs_output_actual_stop(obs_output_t *output, bool force, uint64_t ts)
 {
+	bool call_stop = true;
+	bool was_reconnecting = false;
+
 	if (stopping(output))
 		return;
 	os_event_reset(output->stopping_event);
 
-	os_event_signal(output->reconnect_stop_event);
-	if (output->reconnect_thread_active)
-		pthread_join(output->reconnect_thread, NULL);
+	was_reconnecting = reconnecting(output) && !delay_active(output);
+	if (reconnecting(output)) {
+		os_event_signal(output->reconnect_stop_event);
+		if (output->reconnect_thread_active)
+			pthread_join(output->reconnect_thread, NULL);
+	}
 
-	if (output->context.data)
-		output->info.stop(output->context.data);
+	if (force) {
+		if (delay_active(output)) {
+			call_stop = delay_capturing(output);
+			os_atomic_set_bool(&output->delay_active, false);
+			os_atomic_set_bool(&output->delay_capturing, false);
+			output->stop_code = OBS_OUTPUT_SUCCESS;
+			obs_output_end_data_capture(output);
+			os_event_signal(output->stopping_event);
+		} else {
+			call_stop = data_active(output);
+		}
+	} else {
+		call_stop = data_active(output);
+	}
 
-	if (output->video)
-		log_frame_info(output);
+	if (output->context.data && call_stop) {
+		output->info.stop(output->context.data, ts);
 
-	if (delay_active(output) &&
-	    (force || !os_atomic_load_long(&output->delay_restart_refs))) {
-		os_atomic_set_bool(&output->delay_active, false);
-		obs_output_end_data_capture(output);
+	} else if (was_reconnecting) {
+		output->stop_code = OBS_OUTPUT_SUCCESS;
+		signal_stop(output);
+		os_event_signal(output->stopping_event);
 	}
-
-	if (force || !delay_active(output))
-		signal_stop(output, OBS_OUTPUT_SUCCESS);
 }
 
 void obs_output_stop(obs_output_t *output)
@@ -342,20 +363,30 @@ void obs_output_stop(obs_output_t *output)
 		return;
 	if (!output->context.data)
 		return;
+	if (!active(output) && !reconnecting(output))
+		return;
 
 	encoded = (output->info.flags & OBS_OUTPUT_ENCODED) != 0;
 
 	if (encoded && output->active_delay_ns) {
 		obs_output_delay_stop(output);
+
 	} else if (!stopping(output)) {
-		obs_output_actual_stop(output, false);
 		do_output_signal(output, "stopping");
+		obs_output_actual_stop(output, false, os_gettime_ns());
 	}
 }
 
 void obs_output_force_stop(obs_output_t *output)
 {
-	obs_output_actual_stop(output, true);
+	if (!obs_output_valid(output, "obs_output_force_stop"))
+		return;
+
+	if (!stopping(output)) {
+		output->stop_code = 0;
+		do_output_signal(output, "stopping");
+		obs_output_actual_stop(output, true, 0);
+	}
 }
 
 bool obs_output_active(const obs_output_t *output)
@@ -919,8 +950,7 @@ static inline void send_interleaved(struct obs_output *output)
 		output->total_frames++;
 
 	da_erase(output->interleaved_packets, 0);
-	if (!output->stopped)
-		output->info.encoded_packet(output->context.data, &out);
+	output->info.encoded_packet(output->context.data, &out);
 	obs_free_encoder_packet(&out);
 }
 
@@ -1252,6 +1282,9 @@ static void interleave_packets(void *data, struct encoder_packet *packet)
 	struct encoder_packet out;
 	bool                  was_started;
 
+	if (!active(output))
+		return;
+
 	if (packet->type == OBS_ENCODER_AUDIO)
 		packet->track_idx = get_track_index(output, packet);
 
@@ -1263,6 +1296,9 @@ static void interleave_packets(void *data, struct encoder_packet *packet)
 	    !packet->keyframe) {
 		discard_unused_audio_packets(output, packet->dts_usec);
 		pthread_mutex_unlock(&output->interleaved_mutex);
+
+		if (output->active_delay_ns)
+			obs_free_encoder_packet(packet);
 		return;
 	}
 
@@ -1303,22 +1339,24 @@ static void default_encoded_callback(void *param, struct encoder_packet *packet)
 {
 	struct obs_output *output = param;
 
-	if (packet->type == OBS_ENCODER_AUDIO)
-		packet->track_idx = get_track_index(output, packet);
+	if (data_active(output)) {
+		if (packet->type == OBS_ENCODER_AUDIO)
+			packet->track_idx = get_track_index(output, packet);
 
-	if (!output->stopped)
 		output->info.encoded_packet(output->context.data, packet);
+
+		if (packet->type == OBS_ENCODER_VIDEO)
+			output->total_frames++;
+	}
+
 	if (output->active_delay_ns)
 		obs_free_encoder_packet(packet);
-
-	if (packet->type == OBS_ENCODER_VIDEO)
-		output->total_frames++;
 }
 
 static void default_raw_video_callback(void *param, struct video_data *frame)
 {
 	struct obs_output *output = param;
-	if (!output->stopped)
+	if (data_active(output))
 		output->info.raw_video(output->context.data, frame);
 	output->total_frames++;
 }
@@ -1327,8 +1365,10 @@ static void default_raw_audio_callback(void *param, size_t mix_idx,
 		struct audio_data *frames)
 {
 	struct obs_output *output = param;
-	if (!output->stopped)
-		output->info.raw_audio(output->context.data, frames);
+	if (!data_active(output))
+		return;
+
+	output->info.raw_audio(output->context.data, frames);
 
 	UNUSED_PARAMETER(mix_idx);
 }
@@ -1430,13 +1470,13 @@ static inline void signal_reconnect_success(struct obs_output *output)
 	do_output_signal(output, "reconnect_success");
 }
 
-static inline void signal_stop(struct obs_output *output, int code)
+static inline void signal_stop(struct obs_output *output)
 {
 	struct calldata params;
 	uint8_t stack[128];
 
 	calldata_init_fixed(&params, stack, sizeof(stack));
-	calldata_set_int(&params, "code", code);
+	calldata_set_int(&params, "code", output->stop_code);
 	calldata_set_ptr(&params, "output", output);
 	signal_handler_signal(output->context.signals, "stop", &params);
 }
@@ -1594,6 +1634,7 @@ bool obs_output_begin_data_capture(obs_output_t *output, uint32_t flags)
 				has_service))
 		return false;
 
+	os_atomic_set_bool(&output->data_active, true);
 	hook_data_capture(output, encoded, has_video, has_audio);
 
 	if (has_service)
@@ -1672,20 +1713,37 @@ static void *end_data_capture_thread(void *data)
 	return NULL;
 }
 
-void obs_output_end_data_capture(obs_output_t *output)
+static void obs_output_end_data_capture_internal(obs_output_t *output,
+		bool signal)
 {
 	int ret;
 
 	if (!obs_output_valid(output, "obs_output_end_data_capture"))
 		return;
 
+	if (!active(output) || !data_active(output)) {
+		if (signal) {
+			signal_stop(output);
+			output->stop_code = OBS_OUTPUT_SUCCESS;
+		}
+		return;
+	}
+
 	if (delay_active(output)) {
 		os_atomic_set_bool(&output->delay_capturing, false);
-		os_event_signal(output->stopping_event);
-		return;
+
+		if (!os_atomic_load_long(&output->delay_restart_refs)) {
+			os_atomic_set_bool(&output->delay_active, false);
+		} else {
+			os_event_signal(output->stopping_event);
+			return;
+		}
 	}
 
-	if (!active(output)) return;
+	os_atomic_set_bool(&output->data_active, false);
+
+	if (output->video)
+		log_frame_info(output);
 
 	if (data_capture_ending(output))
 		pthread_join(output->end_data_capture_thread, NULL);
@@ -1698,6 +1756,16 @@ void obs_output_end_data_capture(obs_output_t *output)
 				"for output '%s'!", output->context.name);
 		end_data_capture_thread(output);
 	}
+
+	if (signal) {
+		signal_stop(output);
+		output->stop_code = OBS_OUTPUT_SUCCESS;
+	}
+}
+
+void obs_output_end_data_capture(obs_output_t *output)
+{
+	obs_output_end_data_capture_internal(output, true);
 }
 
 static void *reconnect_thread(void *param)
@@ -1729,12 +1797,11 @@ static void output_reconnect(struct obs_output *output)
 	}
 
 	if (output->reconnect_retries >= output->reconnect_retry_max) {
+		output->stop_code = OBS_OUTPUT_DISCONNECTED;
 		os_atomic_set_bool(&output->reconnecting, false);
-		if (delay_active(output)) {
+		if (delay_active(output))
 			os_atomic_set_bool(&output->delay_active, false);
-			obs_output_end_data_capture(output);
-		}
-		signal_stop(output, OBS_OUTPUT_DISCONNECTED);
+		obs_output_end_data_capture(output);
 		return;
 	}
 
@@ -1749,12 +1816,12 @@ static void output_reconnect(struct obs_output *output)
 
 	output->reconnect_retries++;
 
+	output->stop_code = OBS_OUTPUT_DISCONNECTED;
 	ret = pthread_create(&output->reconnect_thread, NULL,
 			&reconnect_thread, output);
 	if (ret < 0) {
 		blog(LOG_WARNING, "Failed to create reconnect thread");
 		os_atomic_set_bool(&output->reconnecting, false);
-		signal_stop(output, OBS_OUTPUT_DISCONNECTED);
 	} else {
 		blog(LOG_INFO, "Output '%s':  Reconnecting in %d seconds..",
 				output->context.name,
@@ -1764,22 +1831,30 @@ static void output_reconnect(struct obs_output *output)
 	}
 }
 
+static inline bool can_reconnect(const obs_output_t *output, int code)
+{
+	bool reconnect_active = output->reconnect_retry_max != 0;
+
+	return (reconnecting(output) && code != OBS_OUTPUT_SUCCESS) ||
+		(reconnect_active && code == OBS_OUTPUT_DISCONNECTED);
+}
+
 void obs_output_signal_stop(obs_output_t *output, int code)
 {
 	if (!obs_output_valid(output, "obs_output_signal_stop"))
 		return;
 
-	obs_output_end_data_capture(output);
+	output->stop_code = code;
 
-	if ((reconnecting(output) && code != OBS_OUTPUT_SUCCESS) ||
-	    code == OBS_OUTPUT_DISCONNECTED) {
+	if (can_reconnect(output, code)) {
+		if (delay_active(output))
+			os_atomic_inc_long(&output->delay_restart_refs);
+		obs_output_end_data_capture_internal(output, false);
 		output_reconnect(output);
 	} else {
-		if (delay_active(output)) {
+		if (delay_active(output))
 			os_atomic_set_bool(&output->delay_active, false);
-			obs_output_end_data_capture(output);
-		}
-		signal_stop(output, code);
+		obs_output_end_data_capture(output);
 	}
 }
 

+ 1 - 1
libobs/obs-output.h

@@ -42,7 +42,7 @@ struct obs_output_info {
 	void (*destroy)(void *data);
 
 	bool (*start)(void *data);
-	void (*stop)(void *data);
+	void (*stop)(void *data, uint64_t ts);
 
 	void (*raw_video)(void *data, struct video_data *frame);
 	void (*raw_audio)(void *data, struct audio_data *frames);

+ 43 - 15
plugins/obs-ffmpeg/obs-ffmpeg-mux.c

@@ -19,6 +19,7 @@
 #include <obs-avc.h>
 #include <util/dstr.h>
 #include <util/pipe.h>
+#include <util/threading.h>
 #include "ffmpeg-mux/ffmpeg-mux.h"
 
 #include <libavformat/avformat.h>
@@ -33,10 +34,12 @@
 struct ffmpeg_muxer {
 	obs_output_t      *output;
 	os_process_pipe_t *pipe;
+	int64_t           stop_ts;
 	struct dstr       path;
 	bool              sent_headers;
-	bool              active;
-	bool              capturing;
+	volatile bool     active;
+	volatile bool     stopping;
+	volatile bool     capturing;
 };
 
 static const char *ffmpeg_mux_getname(void *unused)
@@ -72,6 +75,21 @@ static void *ffmpeg_mux_create(obs_data_t *settings, obs_output_t *output)
 #define FFMPEG_MUX "ffmpeg-mux"
 #endif
 
+static inline bool capturing(struct ffmpeg_muxer *stream)
+{
+	return os_atomic_load_bool(&stream->capturing);
+}
+
+static inline bool stopping(struct ffmpeg_muxer *stream)
+{
+	return os_atomic_load_bool(&stream->stopping);
+}
+
+static inline bool active(struct ffmpeg_muxer *stream)
+{
+	return os_atomic_load_bool(&stream->active);
+}
+
 /* TODO: allow codecs other than h264 whenever we start using them */
 
 static void add_video_encoder_params(struct ffmpeg_muxer *stream,
@@ -223,8 +241,8 @@ static bool ffmpeg_mux_start(void *data)
 	}
 
 	/* write headers and start capture */
-	stream->active = true;
-	stream->capturing = true;
+	os_atomic_set_bool(&stream->active, true);
+	os_atomic_set_bool(&stream->capturing, true);
 	obs_output_begin_data_capture(stream->output, 0);
 
 	info("Writing file '%s'...", stream->path.array);
@@ -235,29 +253,32 @@ static int deactivate(struct ffmpeg_muxer *stream)
 {
 	int ret = -1;
 
-	if (stream->active) {
+	if (active(stream)) {
 		ret = os_process_pipe_destroy(stream->pipe);
 		stream->pipe = NULL;
 
-		stream->active = false;
-		stream->sent_headers = false;
+		os_atomic_set_bool(&stream->active, false);
+		os_atomic_set_bool(&stream->sent_headers, false);
 
 		info("Output of file '%s' stopped", stream->path.array);
 	}
 
+	if (stopping(stream))
+		obs_output_end_data_capture(stream->output);
+
+	os_atomic_set_bool(&stream->stopping, false);
 	return ret;
 }
 
-static void ffmpeg_mux_stop(void *data)
+static void ffmpeg_mux_stop(void *data, uint64_t ts)
 {
 	struct ffmpeg_muxer *stream = data;
 
-	if (stream->capturing) {
-		obs_output_end_data_capture(stream->output);
-		stream->capturing = false;
+	if (capturing(stream)) {
+		stream->stop_ts = (int64_t)ts / 1000LL;
+		os_atomic_set_bool(&stream->stopping, true);
+		os_atomic_set_bool(&stream->capturing, false);
 	}
-
-	deactivate(stream);
 }
 
 static void signal_failure(struct ffmpeg_muxer *stream)
@@ -271,7 +292,7 @@ static void signal_failure(struct ffmpeg_muxer *stream)
 	}
 
 	obs_output_signal_stop(stream->output, code);
-	stream->capturing = false;
+	os_atomic_set_bool(&stream->capturing, false);
 }
 
 static bool write_packet(struct ffmpeg_muxer *stream,
@@ -358,7 +379,7 @@ static void ffmpeg_mux_data(void *data, struct encoder_packet *packet)
 {
 	struct ffmpeg_muxer *stream = data;
 
-	if (!stream->active)
+	if (!active(stream))
 		return;
 
 	if (!stream->sent_headers) {
@@ -368,6 +389,13 @@ static void ffmpeg_mux_data(void *data, struct encoder_packet *packet)
 		stream->sent_headers = true;
 	}
 
+	if (stopping(stream)) {
+		if (packet->sys_dts_usec >= stream->stop_ts) {
+			deactivate(stream);
+			return;
+		}
+	}
+
 	write_packet(stream, packet);
 }
 

+ 67 - 4
plugins/obs-ffmpeg/obs-ffmpeg-output.c

@@ -89,6 +89,11 @@ struct ffmpeg_output {
 	bool               connecting;
 	pthread_t          start_thread;
 
+	uint64_t           audio_start_ts;
+	uint64_t           video_start_ts;
+	uint64_t           stop_ts;
+	volatile bool      stopping;
+
 	bool               write_thread_active;
 	pthread_mutex_t    write_mutex;
 	pthread_t          write_thread;
@@ -549,6 +554,11 @@ fail:
 
 /* ------------------------------------------------------------------------- */
 
+static inline bool stopping(struct ffmpeg_output *output)
+{
+	return os_atomic_load_bool(&output->stopping);
+}
+
 static const char *ffmpeg_output_getname(void *unused)
 {
 	UNUSED_PARAMETER(unused);
@@ -589,7 +599,7 @@ fail:
 	return NULL;
 }
 
-static void ffmpeg_output_stop(void *data);
+static void ffmpeg_output_full_stop(void *data);
 static void ffmpeg_deactivate(struct ffmpeg_output *output);
 
 static void ffmpeg_output_destroy(void *data)
@@ -600,7 +610,7 @@ static void ffmpeg_output_destroy(void *data)
 		if (output->connecting)
 			pthread_join(output->start_thread, NULL);
 
-		ffmpeg_output_stop(output);
+		ffmpeg_output_full_stop(output);
 
 		pthread_mutex_destroy(&output->write_mutex);
 		os_sem_destroy(output->write_sem);
@@ -648,6 +658,8 @@ static void receive_video(void *param, struct video_data *frame)
 
 	av_init_packet(&packet);
 
+	if (!output->video_start_ts)
+		output->video_start_ts = frame->timestamp;
 	if (!data->start_timestamp)
 		data->start_timestamp = frame->timestamp;
 
@@ -769,6 +781,8 @@ static bool prepare_audio(struct ffmpeg_data *data,
 			return false;
 
 		cutoff = data->start_timestamp - frame->timestamp;
+		output->timestamp += cutoff;
+
 		cutoff = cutoff * (uint64_t)data->audio_samplerate /
 			1000000000;
 
@@ -798,6 +812,9 @@ static void receive_audio(void *param, struct audio_data *frame)
 	if (!prepare_audio(data, frame, &in))
 		return;
 
+	if (!output->audio_start_ts)
+		output->audio_start_ts = in.timestamp;
+
 	frame_size_bytes = (size_t)data->frame_size * data->audio_size;
 
 	for (size_t i = 0; i < data->audio_planes; i++)
@@ -813,6 +830,26 @@ static void receive_audio(void *param, struct audio_data *frame)
 	}
 }
 
+static uint64_t get_packet_sys_dts(struct ffmpeg_output *output,
+		AVPacket *packet)
+{
+	struct ffmpeg_data *data = &output->ff_data;
+	uint64_t start_ts;
+
+	AVRational time_base;
+
+	if (data->video && data->video->index == packet->stream_index) {
+		time_base = data->video->time_base;
+		start_ts = output->video_start_ts;
+	} else {
+		time_base = data->audio->time_base;
+		start_ts = output->audio_start_ts;
+	}
+
+	return start_ts + (uint64_t)av_rescale_q(packet->dts,
+			time_base, (AVRational){1, 1000000000});
+}
+
 static int process_packet(struct ffmpeg_output *output)
 {
 	AVPacket packet;
@@ -835,6 +872,14 @@ static int process_packet(struct ffmpeg_output *output)
 			packet.size, packet.flags,
 			packet.stream_index, output->packets.num);*/
 
+	if (stopping(output)) {
+		uint64_t sys_ts = get_packet_sys_dts(output, &packet);
+		if (sys_ts >= output->stop_ts) {
+			ffmpeg_output_full_stop(output);
+			return 0;
+		}
+	}
+
 	ret = av_interleaved_write_frame(output->ff_data.output, &packet);
 	if (ret < 0) {
 		av_free_packet(&packet);
@@ -955,7 +1000,7 @@ static bool try_connect(struct ffmpeg_output *output)
 	if (ret != 0) {
 		blog(LOG_WARNING, "ffmpeg_output_start: failed to create write "
 		                  "thread.");
-		ffmpeg_output_stop(output);
+		ffmpeg_output_full_stop(output);
 		return false;
 	}
 
@@ -986,11 +1031,15 @@ static bool ffmpeg_output_start(void *data)
 	if (output->connecting)
 		return false;
 
+	os_atomic_set_bool(&output->stopping, false);
+	output->audio_start_ts = 0;
+	output->video_start_ts = 0;
+
 	ret = pthread_create(&output->start_thread, NULL, start_thread, output);
 	return (output->connecting = (ret == 0));
 }
 
-static void ffmpeg_output_stop(void *data)
+static void ffmpeg_output_full_stop(void *data)
 {
 	struct ffmpeg_output *output = data;
 
@@ -1000,6 +1049,20 @@ static void ffmpeg_output_stop(void *data)
 	}
 }
 
+static void ffmpeg_output_stop(void *data, uint64_t ts)
+{
+	struct ffmpeg_output *output = data;
+
+	if (output->active) {
+		if (ts == 0) {
+			ffmpeg_output_full_stop(output);
+		} else {
+			os_atomic_set_bool(&output->stopping, true);
+			output->stop_ts = ts;
+		}
+	}
+}
+
 static void ffmpeg_deactivate(struct ffmpeg_output *output)
 {
 	if (output->write_thread_active) {

+ 5 - 3
plugins/obs-outputs/flv-output.c

@@ -46,14 +46,14 @@ static const char *flv_output_getname(void *unused)
 	return obs_module_text("FLVOutput");
 }
 
-static void flv_output_stop(void *data);
+static void flv_output_stop(void *data, uint64_t ts);
 
 static void flv_output_destroy(void *data)
 {
 	struct flv_output *stream = data;
 
 	if (stream->active)
-		flv_output_stop(data);
+		flv_output_stop(data, 0);
 
 	dstr_free(&stream->path);
 	bfree(stream);
@@ -68,7 +68,7 @@ static void *flv_output_create(obs_data_t *settings, obs_output_t *output)
 	return stream;
 }
 
-static void flv_output_stop(void *data)
+static void flv_output_stop(void *data, uint64_t ts)
 {
 	struct flv_output *stream = data;
 
@@ -84,6 +84,8 @@ static void flv_output_stop(void *data)
 
 		info("FLV file output complete");
 	}
+
+	UNUSED_PARAMETER(ts);
 }
 
 static int write_packet(struct flv_output *stream,

+ 19 - 5
plugins/obs-outputs/rtmp-stream.c

@@ -63,6 +63,7 @@ struct rtmp_stream {
 
 	os_sem_t         *send_sem;
 	os_event_t       *stop_event;
+	uint64_t         stop_ts;
 
 	struct dstr      path, key;
 	struct dstr      username, password;
@@ -146,6 +147,7 @@ static void rtmp_stream_destroy(void *data)
 		if (stream->connecting)
 			pthread_join(stream->connect_thread, NULL);
 
+		stream->stop_ts = 0;
 		os_event_signal(stream->stop_event);
 
 		if (active(stream)) {
@@ -193,7 +195,7 @@ fail:
 	return NULL;
 }
 
-static void rtmp_stream_stop(void *data)
+static void rtmp_stream_stop(void *data, uint64_t ts)
 {
 	struct rtmp_stream *stream = data;
 
@@ -203,11 +205,12 @@ static void rtmp_stream_stop(void *data)
 	if (connecting(stream))
 		pthread_join(stream->connect_thread, NULL);
 
+	stream->stop_ts = ts / 1000ULL;
 	os_event_signal(stream->stop_event);
 
 	if (active(stream)) {
-		os_sem_post(stream->send_sem);
-		obs_output_end_data_capture(stream->output);
+		if (stream->stop_ts == 0)
+			os_sem_post(stream->send_sem);
 	}
 }
 
@@ -322,11 +325,20 @@ static void *send_thread(void *data)
 	while (os_sem_wait(stream->send_sem) == 0) {
 		struct encoder_packet packet;
 
-		if (stopping(stream))
+		if (stopping(stream) && stream->stop_ts == 0) {
 			break;
+		}
+
 		if (!get_next_packet(stream, &packet))
 			continue;
 
+		if (stopping(stream)) {
+			if (packet.sys_dts_usec >= (int64_t)stream->stop_ts) {
+				obs_free_encoder_packet(&packet);
+				break;
+			}
+		}
+
 		if (!stream->sent_headers) {
 			if (!send_headers(stream)) {
 				os_atomic_set_bool(&stream->disconnected, true);
@@ -351,6 +363,8 @@ static void *send_thread(void *data)
 	if (!stopping(stream)) {
 		pthread_detach(stream->send_thread);
 		obs_output_signal_stop(stream->output, OBS_OUTPUT_DISCONNECTED);
+	} else {
+		obs_output_end_data_capture(stream->output);
 	}
 
 	free_packets(stream);
@@ -795,7 +809,7 @@ static void rtmp_stream_data(void *data, struct encoder_packet *packet)
 	struct encoder_packet new_packet;
 	bool                  added_packet = false;
 
-	if (disconnected(stream))
+	if (disconnected(stream) || !active(stream))
 		return;
 
 	if (packet->type == OBS_ENCODER_VIDEO)