buffered-file-serializer.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. /*
  2. * Copyright (c) 2024 Dennis Sädtler <[email protected]>
  3. *
  4. * Permission to use, copy, modify, and distribute this software for any
  5. * purpose with or without fee is hereby granted, provided that the above
  6. * copyright notice and this permission notice appear in all copies.
  7. *
  8. * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9. * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  10. * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  11. * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  12. * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  13. * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  14. * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. */
  16. #include "buffered-file-serializer.h"
  17. #include <inttypes.h>
  18. #include "platform.h"
  19. #include "threading.h"
  20. #include "deque.h"
  21. #include "dstr.h"
  22. static const size_t DEFAULT_BUF_SIZE = 256ULL * 1048576ULL; // 256 MiB
  23. static const size_t DEFAULT_CHUNK_SIZE = 1048576; // 1 MiB
  24. /* ========================================================================== */
  25. /* Buffered writer based on ffmpeg-mux implementation */
  26. struct io_header {
  27. uint64_t seek_offset;
  28. uint64_t data_length;
  29. };
  30. struct io_buffer {
  31. bool active;
  32. bool shutdown_requested;
  33. bool output_error;
  34. os_event_t *buffer_space_available_event;
  35. os_event_t *new_data_available_event;
  36. pthread_t io_thread;
  37. pthread_mutex_t data_mutex;
  38. FILE *output_file;
  39. struct deque data;
  40. uint64_t next_pos;
  41. size_t buffer_size;
  42. size_t chunk_size;
  43. };
  44. struct file_output_data {
  45. struct dstr filename;
  46. struct io_buffer io;
  47. };
  48. static void *io_thread(void *opaque)
  49. {
  50. struct file_output_data *out = opaque;
  51. os_set_thread_name("buffered writer i/o thread");
  52. // Chunk collects the writes into a larger batch
  53. size_t chunk_used = 0;
  54. size_t chunk_size = out->io.chunk_size;
  55. unsigned char *chunk = bmalloc(chunk_size);
  56. if (!chunk) {
  57. os_atomic_set_bool(&out->io.output_error, true);
  58. fprintf(stderr, "Error allocating memory for output\n");
  59. goto error;
  60. }
  61. bool shutting_down;
  62. bool want_seek = false;
  63. bool force_flush_chunk = false;
  64. // current_seek_position is a virtual position updated as we read from
  65. // the buffer, if it becomes discontinuous due to a seek request we
  66. // flush the chunk. next_seek_position is the actual offset we should
  67. // seek to when we write the chunk.
  68. uint64_t current_seek_position = 0;
  69. uint64_t next_seek_position;
  70. for (;;) {
  71. // Wait for data to be written to the buffer
  72. os_event_wait(out->io.new_data_available_event);
  73. // Loop to write in chunk_size chunks
  74. for (;;) {
  75. pthread_mutex_lock(&out->io.data_mutex);
  76. shutting_down = os_atomic_load_bool(
  77. &out->io.shutdown_requested);
  78. // Fetch as many writes as possible from the deque
  79. // and fill up our local chunk. This may involve
  80. // seeking, so take care of that as well.
  81. for (;;) {
  82. size_t available = out->io.data.size;
  83. // Buffer is empty (now) or was already empty (we got
  84. // woken up to exit)
  85. if (!available)
  86. break;
  87. // Get seek offset and data size
  88. struct io_header header;
  89. deque_peek_front(&out->io.data, &header,
  90. sizeof(header));
  91. // Do we need to seek?
  92. if (header.seek_offset !=
  93. current_seek_position) {
  94. // If there's already part of a chunk pending,
  95. // flush it at the current offset. Similarly,
  96. // if we already plan to seek, then seek.
  97. if (chunk_used || want_seek) {
  98. force_flush_chunk = true;
  99. break;
  100. }
  101. // Mark that we need to seek and where to
  102. want_seek = true;
  103. next_seek_position = header.seek_offset;
  104. // Update our virtual position
  105. current_seek_position =
  106. header.seek_offset;
  107. }
  108. // Make sure there's enough room for the data, if
  109. // not then force a flush
  110. if (header.data_length + chunk_used >
  111. chunk_size) {
  112. force_flush_chunk = true;
  113. break;
  114. }
  115. // Remove header that we already read
  116. deque_pop_front(&out->io.data, NULL,
  117. sizeof(header));
  118. // Copy from the buffer to our local chunk
  119. deque_pop_front(&out->io.data,
  120. chunk + chunk_used,
  121. header.data_length);
  122. // Update offsets
  123. chunk_used += header.data_length;
  124. current_seek_position += header.data_length;
  125. }
  126. // Signal that there is more room in the buffer
  127. os_event_signal(out->io.buffer_space_available_event);
  128. // Try to avoid lots of small writes unless this was the final
  129. // data left in the buffer. The buffer might be entirely empty
  130. // if we were woken up to exit.
  131. if (!force_flush_chunk &&
  132. (!chunk_used ||
  133. (chunk_used < 65536 && !shutting_down))) {
  134. os_event_reset(
  135. out->io.new_data_available_event);
  136. pthread_mutex_unlock(&out->io.data_mutex);
  137. break;
  138. }
  139. pthread_mutex_unlock(&out->io.data_mutex);
  140. // Seek if we need to
  141. if (want_seek) {
  142. os_fseeki64(out->io.output_file,
  143. next_seek_position, SEEK_SET);
  144. // Update the next virtual position, making sure to take
  145. // into account the size of the chunk we're about to write.
  146. current_seek_position =
  147. next_seek_position + chunk_used;
  148. want_seek = false;
  149. // If we did a seek but do not have any data left to write
  150. // return to the start of the loop.
  151. if (!chunk_used) {
  152. force_flush_chunk = false;
  153. continue;
  154. }
  155. }
  156. // Write the current chunk to the output file
  157. size_t bytes_written = fwrite(chunk, 1, chunk_used,
  158. out->io.output_file);
  159. if (bytes_written != chunk_used) {
  160. blog(LOG_ERROR,
  161. "Error writing to '%s': %s (%zu != %zu)\n",
  162. out->filename.array, strerror(errno),
  163. bytes_written, chunk_used);
  164. os_atomic_set_bool(&out->io.output_error, true);
  165. goto error;
  166. }
  167. chunk_used = 0;
  168. force_flush_chunk = false;
  169. }
  170. // If this was the last chunk, time to exit
  171. if (shutting_down)
  172. break;
  173. }
  174. error:
  175. if (chunk)
  176. bfree(chunk);
  177. fclose(out->io.output_file);
  178. return NULL;
  179. }
  180. /* ========================================================================== */
  181. /* Serializer Implementation */
  182. static int64_t file_output_seek(void *opaque, int64_t offset,
  183. enum serialize_seek_type seek_type)
  184. {
  185. struct file_output_data *out = opaque;
  186. // If the output thread failed, signal that back up the stack
  187. if (os_atomic_load_bool(&out->io.output_error))
  188. return -1;
  189. // Update where the next write should go
  190. pthread_mutex_lock(&out->io.data_mutex);
  191. switch (seek_type) {
  192. case SERIALIZE_SEEK_START:
  193. out->io.next_pos = offset;
  194. break;
  195. case SERIALIZE_SEEK_CURRENT:
  196. out->io.next_pos += offset;
  197. break;
  198. case SERIALIZE_SEEK_END:
  199. out->io.next_pos -= offset;
  200. break;
  201. }
  202. pthread_mutex_unlock(&out->io.data_mutex);
  203. return (int64_t)out->io.next_pos;
  204. }
  205. #ifndef _WIN32
  206. static inline size_t max(size_t a, size_t b)
  207. {
  208. return a > b ? a : b;
  209. }
  210. static inline size_t min(size_t a, size_t b)
  211. {
  212. return a < b ? a : b;
  213. }
  214. #endif
  215. static size_t file_output_write(void *opaque, const void *buf, size_t buf_size)
  216. {
  217. struct file_output_data *out = opaque;
  218. if (!buf_size)
  219. return 0;
  220. // Split writes into at chunks that are at most chunk_size bytes
  221. uintptr_t ptr = (uintptr_t)buf;
  222. size_t remaining = buf_size;
  223. while (remaining) {
  224. if (os_atomic_load_bool(&out->io.output_error))
  225. return 0;
  226. pthread_mutex_lock(&out->io.data_mutex);
  227. size_t next_chunk_size = min(remaining, out->io.chunk_size);
  228. // Avoid unbounded growth of the deque, cap to buffer_size
  229. size_t cap = max(out->io.data.capacity, out->io.buffer_size);
  230. size_t free_space = cap - out->io.data.size;
  231. if (free_space < next_chunk_size + sizeof(struct io_header)) {
  232. blog(LOG_DEBUG, "Waiting for I/O thread...");
  233. // No space, wait for the I/O thread to make space
  234. os_event_reset(out->io.buffer_space_available_event);
  235. pthread_mutex_unlock(&out->io.data_mutex);
  236. os_event_wait(out->io.buffer_space_available_event);
  237. continue;
  238. }
  239. // Calculate how many chunks we can fit into the buffer
  240. size_t num_chunks = free_space / (next_chunk_size +
  241. sizeof(struct io_header));
  242. while (remaining && num_chunks--) {
  243. struct io_header header = {
  244. .data_length = next_chunk_size,
  245. .seek_offset = out->io.next_pos,
  246. };
  247. // Copy the data into the buffer
  248. deque_push_back(&out->io.data, &header, sizeof(header));
  249. deque_push_back(&out->io.data, (const void *)ptr,
  250. next_chunk_size);
  251. // Advance the next write position
  252. out->io.next_pos += next_chunk_size;
  253. // Update remainder and advance data pointer
  254. remaining -= next_chunk_size;
  255. ptr += next_chunk_size;
  256. next_chunk_size = min(remaining, out->io.chunk_size);
  257. }
  258. // Tell the I/O thread that there's new data to be written
  259. os_event_signal(out->io.new_data_available_event);
  260. pthread_mutex_unlock(&out->io.data_mutex);
  261. }
  262. return buf_size - remaining;
  263. }
  264. static int64_t file_output_get_pos(void *opaque)
  265. {
  266. struct file_output_data *out = opaque;
  267. // If thread failed return -1
  268. if (os_atomic_load_bool(&out->io.output_error))
  269. return -1;
  270. return (int64_t)out->io.next_pos;
  271. }
  272. bool buffered_file_serializer_init_defaults(struct serializer *s,
  273. const char *path)
  274. {
  275. return buffered_file_serializer_init(s, path, 0, 0);
  276. }
  277. bool buffered_file_serializer_init(struct serializer *s, const char *path,
  278. size_t max_bufsize, size_t chunk_size)
  279. {
  280. struct file_output_data *out;
  281. out = bzalloc(sizeof(*out));
  282. dstr_init_copy(&out->filename, path);
  283. out->io.output_file = os_fopen(path, "wb");
  284. if (!out->io.output_file)
  285. return false;
  286. out->io.buffer_size = max_bufsize ? max_bufsize : DEFAULT_BUF_SIZE;
  287. out->io.chunk_size = chunk_size ? chunk_size : DEFAULT_CHUNK_SIZE;
  288. // Start at 1MB, this can grow up to max_bufsize depending
  289. // on how fast data is going in and out.
  290. deque_reserve(&out->io.data, 1048576);
  291. pthread_mutex_init(&out->io.data_mutex, NULL);
  292. os_event_init(&out->io.buffer_space_available_event,
  293. OS_EVENT_TYPE_AUTO);
  294. os_event_init(&out->io.new_data_available_event, OS_EVENT_TYPE_AUTO);
  295. pthread_create(&out->io.io_thread, NULL, io_thread, out);
  296. out->io.active = true;
  297. s->data = out;
  298. s->read = NULL;
  299. s->write = file_output_write;
  300. s->seek = file_output_seek;
  301. s->get_pos = file_output_get_pos;
  302. return true;
  303. }
  304. void buffered_file_serializer_free(struct serializer *s)
  305. {
  306. struct file_output_data *out = s->data;
  307. if (!out)
  308. return;
  309. if (out->io.active) {
  310. os_atomic_set_bool(&out->io.shutdown_requested, true);
  311. // Wakes up the I/O thread and waits for it to finish
  312. pthread_mutex_lock(&out->io.data_mutex);
  313. os_event_signal(out->io.new_data_available_event);
  314. pthread_mutex_unlock(&out->io.data_mutex);
  315. pthread_join(out->io.io_thread, NULL);
  316. os_event_destroy(out->io.new_data_available_event);
  317. os_event_destroy(out->io.buffer_space_available_event);
  318. pthread_mutex_destroy(&out->io.data_mutex);
  319. blog(LOG_DEBUG, "Final buffer capacity: %zu KiB",
  320. out->io.data.capacity / 1024);
  321. deque_free(&out->io.data);
  322. }
  323. dstr_free(&out->filename);
  324. bfree(out);
  325. }