123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- /*
- * Copyright (c) 2024 Dennis Sädtler <[email protected]>
- *
- * Permission to use, copy, modify, and distribute this software for any
- * purpose with or without fee is hereby granted, provided that the above
- * copyright notice and this permission notice appear in all copies.
- *
- * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- */
- #include "buffered-file-serializer.h"
- #include <inttypes.h>
- #include "platform.h"
- #include "threading.h"
- #include "deque.h"
- #include "dstr.h"
- static const size_t DEFAULT_BUF_SIZE = 256ULL * 1048576ULL; // 256 MiB
- static const size_t DEFAULT_CHUNK_SIZE = 1048576; // 1 MiB
- /* ========================================================================== */
- /* Buffered writer based on ffmpeg-mux implementation */
- struct io_header {
- uint64_t seek_offset;
- uint64_t data_length;
- };
- struct io_buffer {
- bool active;
- bool shutdown_requested;
- bool output_error;
- os_event_t *buffer_space_available_event;
- os_event_t *new_data_available_event;
- pthread_t io_thread;
- pthread_mutex_t data_mutex;
- FILE *output_file;
- struct deque data;
- uint64_t next_pos;
- size_t buffer_size;
- size_t chunk_size;
- };
- struct file_output_data {
- struct dstr filename;
- struct io_buffer io;
- };
- static void *io_thread(void *opaque)
- {
- struct file_output_data *out = opaque;
- os_set_thread_name("buffered writer i/o thread");
- // Chunk collects the writes into a larger batch
- size_t chunk_used = 0;
- size_t chunk_size = out->io.chunk_size;
- unsigned char *chunk = bmalloc(chunk_size);
- if (!chunk) {
- os_atomic_set_bool(&out->io.output_error, true);
- fprintf(stderr, "Error allocating memory for output\n");
- goto error;
- }
- bool shutting_down;
- bool want_seek = false;
- bool force_flush_chunk = false;
- // current_seek_position is a virtual position updated as we read from
- // the buffer, if it becomes discontinuous due to a seek request we
- // flush the chunk. next_seek_position is the actual offset we should
- // seek to when we write the chunk.
- uint64_t current_seek_position = 0;
- uint64_t next_seek_position;
- for (;;) {
- // Wait for data to be written to the buffer
- os_event_wait(out->io.new_data_available_event);
- // Loop to write in chunk_size chunks
- for (;;) {
- pthread_mutex_lock(&out->io.data_mutex);
- shutting_down = os_atomic_load_bool(&out->io.shutdown_requested);
- // Fetch as many writes as possible from the deque
- // and fill up our local chunk. This may involve
- // seeking, so take care of that as well.
- for (;;) {
- size_t available = out->io.data.size;
- // Buffer is empty (now) or was already empty (we got
- // woken up to exit)
- if (!available)
- break;
- // Get seek offset and data size
- struct io_header header;
- deque_peek_front(&out->io.data, &header, sizeof(header));
- // Do we need to seek?
- if (header.seek_offset != current_seek_position) {
- // If there's already part of a chunk pending,
- // flush it at the current offset. Similarly,
- // if we already plan to seek, then seek.
- if (chunk_used || want_seek) {
- force_flush_chunk = true;
- break;
- }
- // Mark that we need to seek and where to
- want_seek = true;
- next_seek_position = header.seek_offset;
- // Update our virtual position
- current_seek_position = header.seek_offset;
- }
- // Make sure there's enough room for the data, if
- // not then force a flush
- if (header.data_length + chunk_used > chunk_size) {
- force_flush_chunk = true;
- break;
- }
- // Remove header that we already read
- deque_pop_front(&out->io.data, NULL, sizeof(header));
- // Copy from the buffer to our local chunk
- deque_pop_front(&out->io.data, chunk + chunk_used, header.data_length);
- // Update offsets
- chunk_used += header.data_length;
- current_seek_position += header.data_length;
- }
- // Signal that there is more room in the buffer
- os_event_signal(out->io.buffer_space_available_event);
- // Try to avoid lots of small writes unless this was the final
- // data left in the buffer. The buffer might be entirely empty
- // if we were woken up to exit.
- if (!force_flush_chunk && (!chunk_used || (chunk_used < 65536 && !shutting_down))) {
- os_event_reset(out->io.new_data_available_event);
- pthread_mutex_unlock(&out->io.data_mutex);
- break;
- }
- pthread_mutex_unlock(&out->io.data_mutex);
- // Seek if we need to
- if (want_seek) {
- os_fseeki64(out->io.output_file, next_seek_position, SEEK_SET);
- // Update the next virtual position, making sure to take
- // into account the size of the chunk we're about to write.
- current_seek_position = next_seek_position + chunk_used;
- want_seek = false;
- // If we did a seek but do not have any data left to write
- // return to the start of the loop.
- if (!chunk_used) {
- force_flush_chunk = false;
- continue;
- }
- }
- // Write the current chunk to the output file
- size_t bytes_written = fwrite(chunk, 1, chunk_used, out->io.output_file);
- if (bytes_written != chunk_used) {
- blog(LOG_ERROR, "Error writing to '%s': %s (%zu != %zu)\n", out->filename.array,
- strerror(errno), bytes_written, chunk_used);
- os_atomic_set_bool(&out->io.output_error, true);
- goto error;
- }
- chunk_used = 0;
- force_flush_chunk = false;
- }
- // If this was the last chunk, time to exit
- if (shutting_down)
- break;
- }
- error:
- if (chunk)
- bfree(chunk);
- fclose(out->io.output_file);
- return NULL;
- }
- /* ========================================================================== */
- /* Serializer Implementation */
- static int64_t file_output_seek(void *opaque, int64_t offset, enum serialize_seek_type seek_type)
- {
- struct file_output_data *out = opaque;
- // If the output thread failed, signal that back up the stack
- if (os_atomic_load_bool(&out->io.output_error))
- return -1;
- // Update where the next write should go
- pthread_mutex_lock(&out->io.data_mutex);
- switch (seek_type) {
- case SERIALIZE_SEEK_START:
- out->io.next_pos = offset;
- break;
- case SERIALIZE_SEEK_CURRENT:
- out->io.next_pos += offset;
- break;
- case SERIALIZE_SEEK_END:
- out->io.next_pos -= offset;
- break;
- }
- pthread_mutex_unlock(&out->io.data_mutex);
- return (int64_t)out->io.next_pos;
- }
- #ifndef _WIN32
- static inline size_t max(size_t a, size_t b)
- {
- return a > b ? a : b;
- }
- static inline size_t min(size_t a, size_t b)
- {
- return a < b ? a : b;
- }
- #endif
- static size_t file_output_write(void *opaque, const void *buf, size_t buf_size)
- {
- struct file_output_data *out = opaque;
- if (!buf_size)
- return 0;
- // Split writes into at chunks that are at most chunk_size bytes
- uintptr_t ptr = (uintptr_t)buf;
- size_t remaining = buf_size;
- while (remaining) {
- if (os_atomic_load_bool(&out->io.output_error))
- return 0;
- pthread_mutex_lock(&out->io.data_mutex);
- size_t next_chunk_size = min(remaining, out->io.chunk_size);
- // Avoid unbounded growth of the deque, cap to buffer_size
- size_t cap = max(out->io.data.capacity, out->io.buffer_size);
- size_t free_space = cap - out->io.data.size;
- if (free_space < next_chunk_size + sizeof(struct io_header)) {
- blog(LOG_DEBUG, "Waiting for I/O thread...");
- // No space, wait for the I/O thread to make space
- os_event_reset(out->io.buffer_space_available_event);
- pthread_mutex_unlock(&out->io.data_mutex);
- os_event_wait(out->io.buffer_space_available_event);
- continue;
- }
- // Calculate how many chunks we can fit into the buffer
- size_t num_chunks = free_space / (next_chunk_size + sizeof(struct io_header));
- while (remaining && num_chunks--) {
- struct io_header header = {
- .data_length = next_chunk_size,
- .seek_offset = out->io.next_pos,
- };
- // Copy the data into the buffer
- deque_push_back(&out->io.data, &header, sizeof(header));
- deque_push_back(&out->io.data, (const void *)ptr, next_chunk_size);
- // Advance the next write position
- out->io.next_pos += next_chunk_size;
- // Update remainder and advance data pointer
- remaining -= next_chunk_size;
- ptr += next_chunk_size;
- next_chunk_size = min(remaining, out->io.chunk_size);
- }
- // Tell the I/O thread that there's new data to be written
- os_event_signal(out->io.new_data_available_event);
- pthread_mutex_unlock(&out->io.data_mutex);
- }
- return buf_size - remaining;
- }
- static int64_t file_output_get_pos(void *opaque)
- {
- struct file_output_data *out = opaque;
- // If thread failed return -1
- if (os_atomic_load_bool(&out->io.output_error))
- return -1;
- return (int64_t)out->io.next_pos;
- }
- bool buffered_file_serializer_init_defaults(struct serializer *s, const char *path)
- {
- return buffered_file_serializer_init(s, path, 0, 0);
- }
- bool buffered_file_serializer_init(struct serializer *s, const char *path, size_t max_bufsize, size_t chunk_size)
- {
- struct file_output_data *out;
- out = bzalloc(sizeof(*out));
- dstr_init_copy(&out->filename, path);
- out->io.output_file = os_fopen(path, "wb");
- if (!out->io.output_file) {
- dstr_free(&out->filename);
- bfree(out);
- return false;
- }
- out->io.buffer_size = max_bufsize ? max_bufsize : DEFAULT_BUF_SIZE;
- out->io.chunk_size = chunk_size ? chunk_size : DEFAULT_CHUNK_SIZE;
- // Start at 1MB, this can grow up to max_bufsize depending
- // on how fast data is going in and out.
- deque_reserve(&out->io.data, 1048576);
- pthread_mutex_init(&out->io.data_mutex, NULL);
- os_event_init(&out->io.buffer_space_available_event, OS_EVENT_TYPE_AUTO);
- os_event_init(&out->io.new_data_available_event, OS_EVENT_TYPE_AUTO);
- pthread_create(&out->io.io_thread, NULL, io_thread, out);
- out->io.active = true;
- s->data = out;
- s->read = NULL;
- s->write = file_output_write;
- s->seek = file_output_seek;
- s->get_pos = file_output_get_pos;
- return true;
- }
- void buffered_file_serializer_free(struct serializer *s)
- {
- struct file_output_data *out = s->data;
- if (!out)
- return;
- if (out->io.active) {
- os_atomic_set_bool(&out->io.shutdown_requested, true);
- // Wakes up the I/O thread and waits for it to finish
- pthread_mutex_lock(&out->io.data_mutex);
- os_event_signal(out->io.new_data_available_event);
- pthread_mutex_unlock(&out->io.data_mutex);
- pthread_join(out->io.io_thread, NULL);
- os_event_destroy(out->io.new_data_available_event);
- os_event_destroy(out->io.buffer_space_available_event);
- pthread_mutex_destroy(&out->io.data_mutex);
- blog(LOG_DEBUG, "Final buffer capacity: %zu KiB", out->io.data.capacity / 1024);
- deque_free(&out->io.data);
- }
- dstr_free(&out->filename);
- bfree(out);
- }
|