obs-ffmpeg-hls-mux.c 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. #include "obs-ffmpeg-mux.h"
  2. #include <obs-avc.h>
  3. #ifdef ENABLE_HEVC
  4. #include <obs-hevc.h>
  5. #endif
  6. #define do_log(level, format, ...) \
  7. blog(level, "[ffmpeg hls muxer: '%s'] " format, \
  8. obs_output_get_name(stream->output), ##__VA_ARGS__)
  9. #define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__)
  10. #define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__)
  11. const char *ffmpeg_hls_mux_getname(void *type)
  12. {
  13. UNUSED_PARAMETER(type);
  14. return obs_module_text("FFmpegHlsMuxer");
  15. }
  16. int hls_stream_dropped_frames(void *data)
  17. {
  18. struct ffmpeg_muxer *stream = data;
  19. return stream->dropped_frames;
  20. }
  21. void ffmpeg_hls_mux_destroy(void *data)
  22. {
  23. struct ffmpeg_muxer *stream = data;
  24. if (stream) {
  25. deactivate(stream, 0);
  26. pthread_mutex_destroy(&stream->write_mutex);
  27. os_sem_destroy(stream->write_sem);
  28. os_event_destroy(stream->stop_event);
  29. da_free(stream->mux_packets);
  30. circlebuf_free(&stream->packets);
  31. os_process_pipe_destroy(stream->pipe);
  32. dstr_free(&stream->path);
  33. dstr_free(&stream->printable_path);
  34. dstr_free(&stream->stream_key);
  35. dstr_free(&stream->muxer_settings);
  36. bfree(data);
  37. }
  38. }
  39. void *ffmpeg_hls_mux_create(obs_data_t *settings, obs_output_t *output)
  40. {
  41. struct ffmpeg_muxer *stream = bzalloc(sizeof(*stream));
  42. pthread_mutex_init_value(&stream->write_mutex);
  43. stream->output = output;
  44. /* init mutex, semaphore and event */
  45. if (pthread_mutex_init(&stream->write_mutex, NULL) != 0)
  46. goto fail;
  47. if (os_event_init(&stream->stop_event, OS_EVENT_TYPE_AUTO) != 0)
  48. goto fail;
  49. if (os_sem_init(&stream->write_sem, 0) != 0)
  50. goto fail;
  51. UNUSED_PARAMETER(settings);
  52. return stream;
  53. fail:
  54. ffmpeg_hls_mux_destroy(stream);
  55. return NULL;
  56. }
  57. static bool process_packet(struct ffmpeg_muxer *stream)
  58. {
  59. struct encoder_packet packet;
  60. bool has_packet = false;
  61. bool ret = true;
  62. pthread_mutex_lock(&stream->write_mutex);
  63. if (stream->packets.size) {
  64. circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
  65. has_packet = true;
  66. }
  67. pthread_mutex_unlock(&stream->write_mutex);
  68. if (has_packet) {
  69. ret = write_packet(stream, &packet);
  70. obs_encoder_packet_release(&packet);
  71. }
  72. return ret;
  73. }
  74. static void *write_thread(void *data)
  75. {
  76. struct ffmpeg_muxer *stream = data;
  77. while (os_sem_wait(stream->write_sem) == 0) {
  78. if (os_event_try(stream->stop_event) == 0)
  79. return NULL;
  80. if (!process_packet(stream))
  81. break;
  82. }
  83. obs_output_signal_stop(stream->output, OBS_OUTPUT_ERROR);
  84. deactivate(stream, 0);
  85. return NULL;
  86. }
  87. bool ffmpeg_hls_mux_start(void *data)
  88. {
  89. struct ffmpeg_muxer *stream = data;
  90. obs_service_t *service;
  91. const char *path_str;
  92. const char *stream_key;
  93. struct dstr path = {0};
  94. obs_encoder_t *vencoder;
  95. obs_data_t *settings;
  96. int keyint_sec;
  97. if (!obs_output_can_begin_data_capture(stream->output, 0))
  98. return false;
  99. if (!obs_output_initialize_encoders(stream->output, 0))
  100. return false;
  101. service = obs_output_get_service(stream->output);
  102. if (!service)
  103. return false;
  104. path_str = obs_service_get_connect_info(
  105. service, OBS_SERVICE_CONNECT_INFO_SERVER_URL);
  106. stream_key = obs_service_get_connect_info(
  107. service, OBS_SERVICE_CONNECT_INFO_STREAM_KEY);
  108. dstr_copy(&stream->stream_key, stream_key);
  109. dstr_copy(&path, path_str);
  110. dstr_replace(&path, "{stream_key}", stream_key);
  111. dstr_copy(&stream->muxer_settings,
  112. "method=PUT http_persistent=1 ignore_io_errors=1 ");
  113. dstr_catf(&stream->muxer_settings, "http_user_agent=libobs/%s",
  114. OBS_VERSION);
  115. vencoder = obs_output_get_video_encoder(stream->output);
  116. settings = obs_encoder_get_settings(vencoder);
  117. keyint_sec = (int)obs_data_get_int(settings, "keyint_sec");
  118. if (keyint_sec) {
  119. dstr_catf(&stream->muxer_settings, " hls_time=%d", keyint_sec);
  120. stream->keyint_sec = keyint_sec;
  121. }
  122. obs_data_release(settings);
  123. start_pipe(stream, path.array);
  124. dstr_free(&path);
  125. if (!stream->pipe) {
  126. obs_output_set_last_error(
  127. stream->output, obs_module_text("HelperProcessFailed"));
  128. warn("Failed to create process pipe");
  129. return false;
  130. }
  131. stream->mux_thread_joinable = pthread_create(&stream->mux_thread, NULL,
  132. write_thread, stream) == 0;
  133. if (!stream->mux_thread_joinable)
  134. return false;
  135. /* write headers and start capture */
  136. os_atomic_set_bool(&stream->active, true);
  137. os_atomic_set_bool(&stream->capturing, true);
  138. stream->is_hls = true;
  139. stream->total_bytes = 0;
  140. stream->dropped_frames = 0;
  141. stream->min_priority = 0;
  142. obs_output_begin_data_capture(stream->output, 0);
  143. dstr_copy(&stream->printable_path, path_str);
  144. info("Writing to path '%s'...", stream->printable_path.array);
  145. return true;
  146. }
  147. static bool write_packet_to_buf(struct ffmpeg_muxer *stream,
  148. struct encoder_packet *packet)
  149. {
  150. circlebuf_push_back(&stream->packets, packet,
  151. sizeof(struct encoder_packet));
  152. return true;
  153. }
  154. static void drop_frames(struct ffmpeg_muxer *stream, int highest_priority)
  155. {
  156. struct circlebuf new_buf = {0};
  157. int num_frames_dropped = 0;
  158. circlebuf_reserve(&new_buf, sizeof(struct encoder_packet) * 8);
  159. while (stream->packets.size) {
  160. struct encoder_packet packet;
  161. circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
  162. /* do not drop audio data or video keyframes */
  163. if (packet.type == OBS_ENCODER_AUDIO ||
  164. packet.drop_priority >= highest_priority) {
  165. circlebuf_push_back(&new_buf, &packet, sizeof(packet));
  166. } else {
  167. num_frames_dropped++;
  168. obs_encoder_packet_release(&packet);
  169. }
  170. }
  171. circlebuf_free(&stream->packets);
  172. stream->packets = new_buf;
  173. if (stream->min_priority < highest_priority)
  174. stream->min_priority = highest_priority;
  175. stream->dropped_frames += num_frames_dropped;
  176. }
  177. static bool find_first_video_packet(struct ffmpeg_muxer *stream,
  178. struct encoder_packet *first)
  179. {
  180. size_t count = stream->packets.size / sizeof(*first);
  181. for (size_t i = 0; i < count; i++) {
  182. struct encoder_packet *cur =
  183. circlebuf_data(&stream->packets, i * sizeof(*first));
  184. if (cur->type == OBS_ENCODER_VIDEO && !cur->keyframe) {
  185. *first = *cur;
  186. return true;
  187. }
  188. }
  189. return false;
  190. }
  191. void check_to_drop_frames(struct ffmpeg_muxer *stream, bool pframes)
  192. {
  193. struct encoder_packet first;
  194. int64_t buffer_duration_usec;
  195. int priority = pframes ? OBS_NAL_PRIORITY_HIGHEST
  196. : OBS_NAL_PRIORITY_HIGH;
  197. int keyint_sec = stream->keyint_sec;
  198. int64_t drop_threshold_sec = keyint_sec ? 2 * keyint_sec : 10;
  199. if (!find_first_video_packet(stream, &first))
  200. return;
  201. buffer_duration_usec = stream->last_dts_usec - first.dts_usec;
  202. if (buffer_duration_usec > drop_threshold_sec * 1000000)
  203. drop_frames(stream, priority);
  204. }
  205. static bool add_video_packet(struct ffmpeg_muxer *stream,
  206. struct encoder_packet *packet)
  207. {
  208. check_to_drop_frames(stream, false);
  209. check_to_drop_frames(stream, true);
  210. /* if currently dropping frames, drop packets until it reaches the
  211. * desired priority */
  212. if (packet->drop_priority < stream->min_priority) {
  213. stream->dropped_frames++;
  214. return false;
  215. } else {
  216. stream->min_priority = 0;
  217. }
  218. stream->last_dts_usec = packet->dts_usec;
  219. return write_packet_to_buf(stream, packet);
  220. }
  221. void ffmpeg_hls_mux_data(void *data, struct encoder_packet *packet)
  222. {
  223. struct ffmpeg_muxer *stream = data;
  224. struct encoder_packet new_packet;
  225. bool added_packet = false;
  226. if (!active(stream))
  227. return;
  228. /* encoder failure */
  229. if (!packet) {
  230. deactivate(stream, OBS_OUTPUT_ENCODE_ERROR);
  231. return;
  232. }
  233. if (!stream->sent_headers) {
  234. if (!send_headers(stream))
  235. return;
  236. stream->sent_headers = true;
  237. }
  238. if (stopping(stream)) {
  239. if (packet->sys_dts_usec >= stream->stop_ts) {
  240. deactivate(stream, 0);
  241. return;
  242. }
  243. }
  244. if (packet->type == OBS_ENCODER_VIDEO) {
  245. const char *const codec =
  246. obs_encoder_get_codec(packet->encoder);
  247. if (strcmp(codec, "h264") == 0) {
  248. packet->drop_priority =
  249. obs_parse_avc_packet_priority(packet);
  250. }
  251. #ifdef ENABLE_HEVC
  252. else if (strcmp(codec, "hevc") == 0) {
  253. packet->drop_priority =
  254. obs_parse_hevc_packet_priority(packet);
  255. }
  256. #endif
  257. }
  258. obs_encoder_packet_ref(&new_packet, packet);
  259. pthread_mutex_lock(&stream->write_mutex);
  260. if (active(stream)) {
  261. added_packet =
  262. (packet->type == OBS_ENCODER_VIDEO)
  263. ? add_video_packet(stream, &new_packet)
  264. : write_packet_to_buf(stream, &new_packet);
  265. }
  266. pthread_mutex_unlock(&stream->write_mutex);
  267. if (added_packet)
  268. os_sem_post(stream->write_sem);
  269. else
  270. obs_encoder_packet_release(&new_packet);
  271. }
  272. struct obs_output_info ffmpeg_hls_muxer = {
  273. .id = "ffmpeg_hls_muxer",
  274. .flags = OBS_OUTPUT_AV | OBS_OUTPUT_ENCODED | OBS_OUTPUT_MULTI_TRACK |
  275. OBS_OUTPUT_SERVICE,
  276. .protocols = "HLS",
  277. #ifdef ENABLE_HEVC
  278. .encoded_video_codecs = "h264;hevc",
  279. #else
  280. .encoded_video_codecs = "h264",
  281. #endif
  282. .encoded_audio_codecs = "aac",
  283. .get_name = ffmpeg_hls_mux_getname,
  284. .create = ffmpeg_hls_mux_create,
  285. .destroy = ffmpeg_hls_mux_destroy,
  286. .start = ffmpeg_hls_mux_start,
  287. .stop = ffmpeg_mux_stop,
  288. .encoded_packet = ffmpeg_hls_mux_data,
  289. .get_total_bytes = ffmpeg_mux_total_bytes,
  290. .get_dropped_frames = hls_stream_dropped_frames,
  291. };