|
@@ -26,6 +26,9 @@
|
|
|
#include <stdlib.h>
|
|
|
#include "ffmpeg-mux.h"
|
|
|
|
|
|
+#include <util/threading.h>
|
|
|
+#include <util/platform.h>
|
|
|
+#include <util/circlebuf.h>
|
|
|
#include <util/dstr.h>
|
|
|
#include <libavcodec/avcodec.h>
|
|
|
#include <libavformat/avformat.h>
|
|
@@ -42,6 +45,8 @@
|
|
|
#define CODEC_FLAG_GLOBAL_H CODEC_FLAG_GLOBAL_HEADER
|
|
|
#endif
|
|
|
|
|
|
+#define AVIO_BUFFER_SIZE 65536
|
|
|
+
|
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
static char *global_stream_key = "";
|
|
@@ -117,6 +122,24 @@ struct audio_info {
|
|
|
AVCodecContext *ctx;
|
|
|
};
|
|
|
|
|
|
+struct io_header {
|
|
|
+ uint64_t seek_offset;
|
|
|
+ uint64_t data_length;
|
|
|
+};
|
|
|
+
|
|
|
+struct io_buffer {
|
|
|
+ bool active;
|
|
|
+ bool shutdown_requested;
|
|
|
+ bool output_error;
|
|
|
+ os_event_t *buffer_space_available_event;
|
|
|
+ os_event_t *new_data_available_event;
|
|
|
+ pthread_t io_thread;
|
|
|
+ pthread_mutex_t data_mutex;
|
|
|
+ FILE *output_file;
|
|
|
+ struct circlebuf data;
|
|
|
+ uint64_t next_pos;
|
|
|
+};
|
|
|
+
|
|
|
struct ffmpeg_mux {
|
|
|
AVFormatContext *output;
|
|
|
AVStream *video_stream;
|
|
@@ -129,7 +152,7 @@ struct ffmpeg_mux {
|
|
|
struct header *audio_header;
|
|
|
int num_audio_streams;
|
|
|
bool initialized;
|
|
|
- char error[4096];
|
|
|
+ struct io_buffer io;
|
|
|
};
|
|
|
|
|
|
static void header_free(struct header *header)
|
|
@@ -167,6 +190,29 @@ static void ffmpeg_mux_free(struct ffmpeg_mux *ffm)
|
|
|
av_write_trailer(ffm->output);
|
|
|
}
|
|
|
|
|
|
+ // If we're writing to a file with the circlebuf, shut it
|
|
|
+ // down gracefully
|
|
|
+ if (ffm->io.active) {
|
|
|
+ os_atomic_set_bool(&ffm->io.shutdown_requested, true);
|
|
|
+
|
|
|
+ // Wakes up the I/O thread and waits for it to finish
|
|
|
+ pthread_mutex_lock(&ffm->io.data_mutex);
|
|
|
+ os_event_signal(ffm->io.new_data_available_event);
|
|
|
+ pthread_mutex_unlock(&ffm->io.data_mutex);
|
|
|
+ pthread_join(ffm->io.io_thread, NULL);
|
|
|
+
|
|
|
+ // Cleanup everything else
|
|
|
+ av_free(ffm->output->pb->buffer);
|
|
|
+ avio_context_free(&ffm->output->pb);
|
|
|
+
|
|
|
+ os_event_destroy(ffm->io.new_data_available_event);
|
|
|
+ os_event_destroy(ffm->io.buffer_space_available_event);
|
|
|
+
|
|
|
+ pthread_mutex_destroy(&ffm->io.data_mutex);
|
|
|
+
|
|
|
+ circlebuf_free(&ffm->io.data);
|
|
|
+ }
|
|
|
+
|
|
|
free_avformat(ffm);
|
|
|
|
|
|
header_free(&ffm->video_header);
|
|
@@ -612,6 +658,219 @@ static inline bool ffmpeg_mux_get_extra_data(struct ffmpeg_mux *ffm)
|
|
|
#pragma warning(disable : 4996)
|
|
|
#endif
|
|
|
|
|
|
+#define CHUNK_SIZE 1048576
|
|
|
+
|
|
|
+static void *ffmpeg_mux_io_thread(void *data)
|
|
|
+{
|
|
|
+ struct ffmpeg_mux *ffm = data;
|
|
|
+
|
|
|
+ // Chunk collects the writes into a larger batch
|
|
|
+ size_t chunk_used = 0;
|
|
|
+
|
|
|
+ unsigned char *chunk = malloc(CHUNK_SIZE);
|
|
|
+ if (!chunk) {
|
|
|
+ os_atomic_set_bool(&ffm->io.output_error, true);
|
|
|
+ fprintf(stderr, "Error allocating memory for output\n");
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ bool shutting_down;
|
|
|
+ bool want_seek = false;
|
|
|
+ bool force_flush_chunk = false;
|
|
|
+
|
|
|
+ // current_seek_position is a virtual position updated as we read from
|
|
|
+ // the buffer, if it becomes discontinuous due to a seek request from
|
|
|
+ // ffmpeg, then we flush the chunk. next_seek_position is the actual
|
|
|
+ // offset we should seek to when we write the chunk.
|
|
|
+ uint64_t current_seek_position = 0;
|
|
|
+ uint64_t next_seek_position;
|
|
|
+
|
|
|
+ for (;;) {
|
|
|
+ // Wait for ffmpeg to write data to the buffer
|
|
|
+ os_event_wait(ffm->io.new_data_available_event);
|
|
|
+
|
|
|
+ // Loop to write in chunk_size chunks
|
|
|
+ for (;;) {
|
|
|
+ shutting_down = os_atomic_load_bool(
|
|
|
+ &ffm->io.shutdown_requested);
|
|
|
+
|
|
|
+ pthread_mutex_lock(&ffm->io.data_mutex);
|
|
|
+
|
|
|
+ // Fetch as many writes as possible from the circlebuf
|
|
|
+ // and fill up our local chunk. This may involve seeking
|
|
|
+ // if ffmpeg needs to, so take care of that as well.
|
|
|
+ for (;;) {
|
|
|
+ size_t available = ffm->io.data.size;
|
|
|
+
|
|
|
+ // Buffer is empty (now) or was already empty (we got
|
|
|
+ // woken up to exit)
|
|
|
+ if (!available)
|
|
|
+ break;
|
|
|
+
|
|
|
+ // Get seek offset and data size
|
|
|
+ struct io_header header;
|
|
|
+ circlebuf_peek_front(&ffm->io.data, &header,
|
|
|
+ sizeof(header));
|
|
|
+
|
|
|
+ // Do we need to seek?
|
|
|
+ if (header.seek_offset !=
|
|
|
+ current_seek_position) {
|
|
|
+
|
|
|
+ // If there's already part of a chunk pending,
|
|
|
+ // flush it at the current offset. Similarly,
|
|
|
+ // if we already plan to seek, then seek.
|
|
|
+ if (chunk_used || want_seek) {
|
|
|
+ force_flush_chunk = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Mark that we need to seek and where to
|
|
|
+ want_seek = true;
|
|
|
+ next_seek_position = header.seek_offset;
|
|
|
+
|
|
|
+ // Update our virtual position
|
|
|
+ current_seek_position =
|
|
|
+ header.seek_offset;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Make sure there's enough room for the data, if
|
|
|
+ // not then force a flush
|
|
|
+ if (header.data_length + chunk_used >
|
|
|
+ CHUNK_SIZE) {
|
|
|
+ force_flush_chunk = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Remove header that we already read
|
|
|
+ circlebuf_pop_front(&ffm->io.data, NULL,
|
|
|
+ sizeof(header));
|
|
|
+
|
|
|
+ // Copy from the buffer to our local chunk
|
|
|
+ circlebuf_pop_front(&ffm->io.data,
|
|
|
+ chunk + chunk_used,
|
|
|
+ header.data_length);
|
|
|
+
|
|
|
+ // Update offsets
|
|
|
+ chunk_used += header.data_length;
|
|
|
+ current_seek_position += header.data_length;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Signal that there is more room in the buffer
|
|
|
+ os_event_signal(ffm->io.buffer_space_available_event);
|
|
|
+
|
|
|
+ // Try to avoid lots of small writes unless this was the final
|
|
|
+ // data left in the buffer. The buffer might be entirely empty
|
|
|
+ // if we were woken up to exit.
|
|
|
+ if (!force_flush_chunk &&
|
|
|
+ (!chunk_used ||
|
|
|
+ (chunk_used < 65536 && !shutting_down))) {
|
|
|
+ os_event_reset(
|
|
|
+ ffm->io.new_data_available_event);
|
|
|
+ pthread_mutex_unlock(&ffm->io.data_mutex);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ pthread_mutex_unlock(&ffm->io.data_mutex);
|
|
|
+
|
|
|
+ // Seek if we need to
|
|
|
+ if (want_seek) {
|
|
|
+ os_fseeki64(ffm->io.output_file,
|
|
|
+ next_seek_position, SEEK_SET);
|
|
|
+ current_seek_position = next_seek_position;
|
|
|
+ want_seek = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Write the current chunk to the output file
|
|
|
+ if (fwrite(chunk, chunk_used, 1, ffm->io.output_file) !=
|
|
|
+ 1) {
|
|
|
+ os_atomic_set_bool(&ffm->io.output_error, true);
|
|
|
+ fprintf(stderr, "Error writing to '%s', %s\n",
|
|
|
+ ffm->params.printable_file.array,
|
|
|
+ strerror(errno));
|
|
|
+ goto error;
|
|
|
+ }
|
|
|
+
|
|
|
+ chunk_used = 0;
|
|
|
+ force_flush_chunk = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // If this was the last chunk, time to exit
|
|
|
+ if (shutting_down)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+error:
|
|
|
+ if (chunk)
|
|
|
+ free(chunk);
|
|
|
+
|
|
|
+ fclose(ffm->io.output_file);
|
|
|
+ return NULL;
|
|
|
+}
|
|
|
+
|
|
|
+static int64_t ffmpeg_mux_seek_av_buffer(void *opaque, int64_t offset,
|
|
|
+ int whence)
|
|
|
+{
|
|
|
+ struct ffmpeg_mux *ffm = opaque;
|
|
|
+
|
|
|
+ // If the output thread failed, signal that back up the stack
|
|
|
+ if (os_atomic_load_bool(&ffm->io.output_error))
|
|
|
+ return -1;
|
|
|
+
|
|
|
+ // Update where the next write should go
|
|
|
+ pthread_mutex_lock(&ffm->io.data_mutex);
|
|
|
+ if (whence == SEEK_SET)
|
|
|
+ ffm->io.next_pos = offset;
|
|
|
+ else if (whence == SEEK_CUR)
|
|
|
+ ffm->io.next_pos += offset;
|
|
|
+ pthread_mutex_unlock(&ffm->io.data_mutex);
|
|
|
+
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+static int ffmpeg_mux_write_av_buffer(void *opaque, uint8_t *buf, int buf_size)
|
|
|
+{
|
|
|
+ struct ffmpeg_mux *ffm = opaque;
|
|
|
+
|
|
|
+ // If the output thread failed, signal that back up the stack
|
|
|
+ if (os_atomic_load_bool(&ffm->io.output_error))
|
|
|
+ return -1;
|
|
|
+
|
|
|
+ for (;;) {
|
|
|
+ pthread_mutex_lock(&ffm->io.data_mutex);
|
|
|
+
|
|
|
+ // Avoid unbounded growth of the circlebuf, cap to 256 MB
|
|
|
+ if (ffm->io.data.capacity >= 256 * 1048576 &&
|
|
|
+ ffm->io.data.capacity - ffm->io.data.size <
|
|
|
+ buf_size + sizeof(struct io_header)) {
|
|
|
+ // No space, wait for the I/O thread to make space
|
|
|
+ os_event_reset(ffm->io.buffer_space_available_event);
|
|
|
+ pthread_mutex_unlock(&ffm->io.data_mutex);
|
|
|
+ os_event_wait(ffm->io.buffer_space_available_event);
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ struct io_header header;
|
|
|
+
|
|
|
+ header.data_length = buf_size;
|
|
|
+ header.seek_offset = ffm->io.next_pos;
|
|
|
+
|
|
|
+ // Copy the data into the buffer
|
|
|
+ circlebuf_push_back(&ffm->io.data, &header, sizeof(header));
|
|
|
+ circlebuf_push_back(&ffm->io.data, buf, buf_size);
|
|
|
+
|
|
|
+ // Advance the next write position
|
|
|
+ ffm->io.next_pos += buf_size;
|
|
|
+
|
|
|
+ // Tell the I/O thread that there's new data to be written
|
|
|
+ os_event_signal(ffm->io.new_data_available_event);
|
|
|
+
|
|
|
+ pthread_mutex_unlock(&ffm->io.data_mutex);
|
|
|
+
|
|
|
+ return buf_size;
|
|
|
+}
|
|
|
+
|
|
|
static inline int open_output_file(struct ffmpeg_mux *ffm)
|
|
|
{
|
|
|
#if LIBAVFORMAT_VERSION_INT < AV_VERSION_INT(59, 0, 100)
|
|
@@ -622,14 +881,42 @@ static inline int open_output_file(struct ffmpeg_mux *ffm)
|
|
|
int ret;
|
|
|
|
|
|
if ((format->flags & AVFMT_NOFILE) == 0) {
|
|
|
- ret = avio_open(&ffm->output->pb, ffm->params.file,
|
|
|
- AVIO_FLAG_WRITE);
|
|
|
- if (ret < 0) {
|
|
|
+ // If not outputting to a network, write to a circlebuf
|
|
|
+ // instead of relying on ffmpeg disk output. This hopefully
|
|
|
+ // works around too small buffers somewhere causing output
|
|
|
+ // stalls when recording.
|
|
|
+
|
|
|
+ // We're in charge of managing the actual file now
|
|
|
+ ffm->io.output_file = os_fopen(ffm->params.file, "wb");
|
|
|
+ if (!ffm->io.output_file) {
|
|
|
fprintf(stderr, "Couldn't open '%s', %s\n",
|
|
|
ffm->params.printable_file.array,
|
|
|
- av_err2str(ret));
|
|
|
+ strerror(errno));
|
|
|
return FFM_ERROR;
|
|
|
}
|
|
|
+
|
|
|
+ // Start at 1MB, this can grow up to 256 MB depending
|
|
|
+ // how fast data is going in and out (limited in
|
|
|
+ // ffmpeg_mux_write_av_buffer)
|
|
|
+ circlebuf_reserve(&ffm->io.data, 1048576);
|
|
|
+
|
|
|
+ pthread_mutex_init(&ffm->io.data_mutex, NULL);
|
|
|
+
|
|
|
+ os_event_init(&ffm->io.buffer_space_available_event,
|
|
|
+ OS_EVENT_TYPE_AUTO);
|
|
|
+ os_event_init(&ffm->io.new_data_available_event,
|
|
|
+ OS_EVENT_TYPE_AUTO);
|
|
|
+
|
|
|
+ pthread_create(&ffm->io.io_thread, NULL, ffmpeg_mux_io_thread,
|
|
|
+ ffm);
|
|
|
+
|
|
|
+ unsigned char *avio_ctx_buffer = av_malloc(AVIO_BUFFER_SIZE);
|
|
|
+
|
|
|
+ ffm->output->pb = avio_alloc_context(
|
|
|
+ avio_ctx_buffer, AVIO_BUFFER_SIZE, 1, ffm, NULL,
|
|
|
+ ffmpeg_mux_write_av_buffer, ffmpeg_mux_seek_av_buffer);
|
|
|
+
|
|
|
+ ffm->io.active = true;
|
|
|
}
|
|
|
|
|
|
AVDictionary *dict = NULL;
|