buffered-file-serializer.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. /*
  2. * Copyright (c) 2024 Dennis Sädtler <[email protected]>
  3. *
  4. * Permission to use, copy, modify, and distribute this software for any
  5. * purpose with or without fee is hereby granted, provided that the above
  6. * copyright notice and this permission notice appear in all copies.
  7. *
  8. * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9. * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  10. * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  11. * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  12. * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  13. * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  14. * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. */
  16. #include "buffered-file-serializer.h"
  17. #include <inttypes.h>
  18. #include "platform.h"
  19. #include "threading.h"
  20. #include "deque.h"
  21. #include "dstr.h"
  22. static const size_t DEFAULT_BUF_SIZE = 256ULL * 1048576ULL; // 256 MiB
  23. static const size_t DEFAULT_CHUNK_SIZE = 1048576; // 1 MiB
  24. /* ========================================================================== */
  25. /* Buffered writer based on ffmpeg-mux implementation */
  26. struct io_header {
  27. uint64_t seek_offset;
  28. uint64_t data_length;
  29. };
  30. struct io_buffer {
  31. bool active;
  32. bool shutdown_requested;
  33. bool output_error;
  34. os_event_t *buffer_space_available_event;
  35. os_event_t *new_data_available_event;
  36. pthread_t io_thread;
  37. pthread_mutex_t data_mutex;
  38. FILE *output_file;
  39. struct deque data;
  40. uint64_t next_pos;
  41. size_t buffer_size;
  42. size_t chunk_size;
  43. };
  44. struct file_output_data {
  45. struct dstr filename;
  46. struct io_buffer io;
  47. };
  48. static void *io_thread(void *opaque)
  49. {
  50. struct file_output_data *out = opaque;
  51. os_set_thread_name("buffered writer i/o thread");
  52. // Chunk collects the writes into a larger batch
  53. size_t chunk_used = 0;
  54. size_t chunk_size = out->io.chunk_size;
  55. unsigned char *chunk = bmalloc(chunk_size);
  56. if (!chunk) {
  57. os_atomic_set_bool(&out->io.output_error, true);
  58. fprintf(stderr, "Error allocating memory for output\n");
  59. goto error;
  60. }
  61. bool shutting_down;
  62. bool want_seek = false;
  63. bool force_flush_chunk = false;
  64. // current_seek_position is a virtual position updated as we read from
  65. // the buffer, if it becomes discontinuous due to a seek request we
  66. // flush the chunk. next_seek_position is the actual offset we should
  67. // seek to when we write the chunk.
  68. uint64_t current_seek_position = 0;
  69. uint64_t next_seek_position;
  70. for (;;) {
  71. // Wait for data to be written to the buffer
  72. os_event_wait(out->io.new_data_available_event);
  73. // Loop to write in chunk_size chunks
  74. for (;;) {
  75. pthread_mutex_lock(&out->io.data_mutex);
  76. shutting_down = os_atomic_load_bool(&out->io.shutdown_requested);
  77. // Fetch as many writes as possible from the deque
  78. // and fill up our local chunk. This may involve
  79. // seeking, so take care of that as well.
  80. for (;;) {
  81. size_t available = out->io.data.size;
  82. // Buffer is empty (now) or was already empty (we got
  83. // woken up to exit)
  84. if (!available)
  85. break;
  86. // Get seek offset and data size
  87. struct io_header header;
  88. deque_peek_front(&out->io.data, &header, sizeof(header));
  89. // Do we need to seek?
  90. if (header.seek_offset != current_seek_position) {
  91. // If there's already part of a chunk pending,
  92. // flush it at the current offset. Similarly,
  93. // if we already plan to seek, then seek.
  94. if (chunk_used || want_seek) {
  95. force_flush_chunk = true;
  96. break;
  97. }
  98. // Mark that we need to seek and where to
  99. want_seek = true;
  100. next_seek_position = header.seek_offset;
  101. // Update our virtual position
  102. current_seek_position = header.seek_offset;
  103. }
  104. // Make sure there's enough room for the data, if
  105. // not then force a flush
  106. if (header.data_length + chunk_used > chunk_size) {
  107. force_flush_chunk = true;
  108. break;
  109. }
  110. // Remove header that we already read
  111. deque_pop_front(&out->io.data, NULL, sizeof(header));
  112. // Copy from the buffer to our local chunk
  113. deque_pop_front(&out->io.data, chunk + chunk_used, header.data_length);
  114. // Update offsets
  115. chunk_used += header.data_length;
  116. current_seek_position += header.data_length;
  117. }
  118. // Signal that there is more room in the buffer
  119. os_event_signal(out->io.buffer_space_available_event);
  120. // Try to avoid lots of small writes unless this was the final
  121. // data left in the buffer. The buffer might be entirely empty
  122. // if we were woken up to exit.
  123. if (!force_flush_chunk && (!chunk_used || (chunk_used < 65536 && !shutting_down))) {
  124. os_event_reset(out->io.new_data_available_event);
  125. pthread_mutex_unlock(&out->io.data_mutex);
  126. break;
  127. }
  128. pthread_mutex_unlock(&out->io.data_mutex);
  129. // Seek if we need to
  130. if (want_seek) {
  131. os_fseeki64(out->io.output_file, next_seek_position, SEEK_SET);
  132. // Update the next virtual position, making sure to take
  133. // into account the size of the chunk we're about to write.
  134. current_seek_position = next_seek_position + chunk_used;
  135. want_seek = false;
  136. // If we did a seek but do not have any data left to write
  137. // return to the start of the loop.
  138. if (!chunk_used) {
  139. force_flush_chunk = false;
  140. continue;
  141. }
  142. }
  143. // Write the current chunk to the output file
  144. size_t bytes_written = fwrite(chunk, 1, chunk_used, out->io.output_file);
  145. if (bytes_written != chunk_used) {
  146. blog(LOG_ERROR, "Error writing to '%s': %s (%zu != %zu)\n", out->filename.array,
  147. strerror(errno), bytes_written, chunk_used);
  148. os_atomic_set_bool(&out->io.output_error, true);
  149. goto error;
  150. }
  151. chunk_used = 0;
  152. force_flush_chunk = false;
  153. }
  154. // If this was the last chunk, time to exit
  155. if (shutting_down)
  156. break;
  157. }
  158. error:
  159. if (chunk)
  160. bfree(chunk);
  161. fclose(out->io.output_file);
  162. return NULL;
  163. }
  164. /* ========================================================================== */
  165. /* Serializer Implementation */
  166. static int64_t file_output_seek(void *opaque, int64_t offset, enum serialize_seek_type seek_type)
  167. {
  168. struct file_output_data *out = opaque;
  169. // If the output thread failed, signal that back up the stack
  170. if (os_atomic_load_bool(&out->io.output_error))
  171. return -1;
  172. // Update where the next write should go
  173. pthread_mutex_lock(&out->io.data_mutex);
  174. switch (seek_type) {
  175. case SERIALIZE_SEEK_START:
  176. out->io.next_pos = offset;
  177. break;
  178. case SERIALIZE_SEEK_CURRENT:
  179. out->io.next_pos += offset;
  180. break;
  181. case SERIALIZE_SEEK_END:
  182. out->io.next_pos -= offset;
  183. break;
  184. }
  185. pthread_mutex_unlock(&out->io.data_mutex);
  186. return (int64_t)out->io.next_pos;
  187. }
  188. #ifndef _WIN32
  189. static inline size_t max(size_t a, size_t b)
  190. {
  191. return a > b ? a : b;
  192. }
  193. static inline size_t min(size_t a, size_t b)
  194. {
  195. return a < b ? a : b;
  196. }
  197. #endif
  198. static size_t file_output_write(void *opaque, const void *buf, size_t buf_size)
  199. {
  200. struct file_output_data *out = opaque;
  201. if (!buf_size)
  202. return 0;
  203. // Split writes into at chunks that are at most chunk_size bytes
  204. uintptr_t ptr = (uintptr_t)buf;
  205. size_t remaining = buf_size;
  206. while (remaining) {
  207. if (os_atomic_load_bool(&out->io.output_error))
  208. return 0;
  209. pthread_mutex_lock(&out->io.data_mutex);
  210. size_t next_chunk_size = min(remaining, out->io.chunk_size);
  211. // Avoid unbounded growth of the deque, cap to buffer_size
  212. size_t cap = max(out->io.data.capacity, out->io.buffer_size);
  213. size_t free_space = cap - out->io.data.size;
  214. if (free_space < next_chunk_size + sizeof(struct io_header)) {
  215. blog(LOG_DEBUG, "Waiting for I/O thread...");
  216. // No space, wait for the I/O thread to make space
  217. os_event_reset(out->io.buffer_space_available_event);
  218. pthread_mutex_unlock(&out->io.data_mutex);
  219. os_event_wait(out->io.buffer_space_available_event);
  220. continue;
  221. }
  222. // Calculate how many chunks we can fit into the buffer
  223. size_t num_chunks = free_space / (next_chunk_size + sizeof(struct io_header));
  224. while (remaining && num_chunks--) {
  225. struct io_header header = {
  226. .data_length = next_chunk_size,
  227. .seek_offset = out->io.next_pos,
  228. };
  229. // Copy the data into the buffer
  230. deque_push_back(&out->io.data, &header, sizeof(header));
  231. deque_push_back(&out->io.data, (const void *)ptr, next_chunk_size);
  232. // Advance the next write position
  233. out->io.next_pos += next_chunk_size;
  234. // Update remainder and advance data pointer
  235. remaining -= next_chunk_size;
  236. ptr += next_chunk_size;
  237. next_chunk_size = min(remaining, out->io.chunk_size);
  238. }
  239. // Tell the I/O thread that there's new data to be written
  240. os_event_signal(out->io.new_data_available_event);
  241. pthread_mutex_unlock(&out->io.data_mutex);
  242. }
  243. return buf_size - remaining;
  244. }
  245. static int64_t file_output_get_pos(void *opaque)
  246. {
  247. struct file_output_data *out = opaque;
  248. // If thread failed return -1
  249. if (os_atomic_load_bool(&out->io.output_error))
  250. return -1;
  251. return (int64_t)out->io.next_pos;
  252. }
  253. bool buffered_file_serializer_init_defaults(struct serializer *s, const char *path)
  254. {
  255. return buffered_file_serializer_init(s, path, 0, 0);
  256. }
  257. bool buffered_file_serializer_init(struct serializer *s, const char *path, size_t max_bufsize, size_t chunk_size)
  258. {
  259. struct file_output_data *out;
  260. out = bzalloc(sizeof(*out));
  261. dstr_init_copy(&out->filename, path);
  262. out->io.output_file = os_fopen(path, "wb");
  263. if (!out->io.output_file) {
  264. dstr_free(&out->filename);
  265. bfree(out);
  266. return false;
  267. }
  268. out->io.buffer_size = max_bufsize ? max_bufsize : DEFAULT_BUF_SIZE;
  269. out->io.chunk_size = chunk_size ? chunk_size : DEFAULT_CHUNK_SIZE;
  270. // Start at 1MB, this can grow up to max_bufsize depending
  271. // on how fast data is going in and out.
  272. deque_reserve(&out->io.data, 1048576);
  273. pthread_mutex_init(&out->io.data_mutex, NULL);
  274. os_event_init(&out->io.buffer_space_available_event, OS_EVENT_TYPE_AUTO);
  275. os_event_init(&out->io.new_data_available_event, OS_EVENT_TYPE_AUTO);
  276. pthread_create(&out->io.io_thread, NULL, io_thread, out);
  277. out->io.active = true;
  278. s->data = out;
  279. s->read = NULL;
  280. s->write = file_output_write;
  281. s->seek = file_output_seek;
  282. s->get_pos = file_output_get_pos;
  283. return true;
  284. }
  285. void buffered_file_serializer_free(struct serializer *s)
  286. {
  287. struct file_output_data *out = s->data;
  288. if (!out)
  289. return;
  290. if (out->io.active) {
  291. os_atomic_set_bool(&out->io.shutdown_requested, true);
  292. // Wakes up the I/O thread and waits for it to finish
  293. pthread_mutex_lock(&out->io.data_mutex);
  294. os_event_signal(out->io.new_data_available_event);
  295. pthread_mutex_unlock(&out->io.data_mutex);
  296. pthread_join(out->io.io_thread, NULL);
  297. os_event_destroy(out->io.new_data_available_event);
  298. os_event_destroy(out->io.buffer_space_available_event);
  299. pthread_mutex_destroy(&out->io.data_mutex);
  300. blog(LOG_DEBUG, "Final buffer capacity: %zu KiB", out->io.data.capacity / 1024);
  301. deque_free(&out->io.data);
  302. }
  303. dstr_free(&out->filename);
  304. bfree(out);
  305. }