ftl-stream.c 30 KB


  1. /******************************************************************************
  2. Copyright (C) 2017 by Quinn Damerell <[email protected]>
  3. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 2 of the License, or
  6. (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ******************************************************************************/
  14. #include <obs-module.h>
  15. #include <obs-avc.h>
  16. #include <util/platform.h>
  17. #include <util/circlebuf.h>
  18. #include <util/dstr.h>
  19. #include <util/threading.h>
  20. #include <inttypes.h>
  21. #include "ftl.h"
  22. #include "flv-mux.h"
  23. #include "net-if.h"
  24. #ifdef _WIN32
  25. #include <Iphlpapi.h>
  26. #else
  27. #include <sys/ioctl.h>
  28. #define INFINITE 0xFFFFFFFF
  29. #endif
  30. #define do_log(level, format, ...) \
  31. blog(level, "[ftl stream: '%s'] " format, \
  32. obs_output_get_name(stream->output), ##__VA_ARGS__)
  33. #define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__)
  34. #define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__)
  35. #define debug(format, ...) do_log(LOG_DEBUG, format, ##__VA_ARGS__)
  36. #define OPT_DROP_THRESHOLD "drop_threshold_ms"
  37. #define OPT_MAX_SHUTDOWN_TIME_SEC "max_shutdown_time_sec"
  38. #define OPT_BIND_IP "bind_ip"
  39. typedef struct _nalu_t {
  40. int len;
  41. int dts_usec;
  42. int send_marker_bit;
  43. uint8_t *data;
  44. } nalu_t;
  45. typedef struct _frame_of_nalus_t {
  46. nalu_t nalus[100];
  47. int total;
  48. int complete_frame;
  49. } frame_of_nalus_t;
  50. struct ftl_stream {
  51. obs_output_t *output;
  52. pthread_mutex_t packets_mutex;
  53. struct circlebuf packets;
  54. bool sent_headers;
  55. int64_t frames_sent;
  56. volatile bool connecting;
  57. pthread_t connect_thread;
  58. pthread_t status_thread;
  59. volatile bool active;
  60. volatile bool disconnected;
  61. pthread_t send_thread;
  62. int max_shutdown_time_sec;
  63. os_sem_t *send_sem;
  64. os_event_t *stop_event;
  65. uint64_t stop_ts;
  66. uint64_t shutdown_timeout_ts;
  67. struct dstr path;
  68. uint32_t channel_id;
  69. struct dstr username, password;
  70. struct dstr encoder_name;
  71. struct dstr bind_ip;
  72. /* frame drop variables */
  73. int64_t drop_threshold_usec;
  74. int64_t pframe_drop_threshold_usec;
  75. int min_priority;
  76. float congestion;
  77. int64_t last_dts_usec;
  78. uint64_t total_bytes_sent;
  79. uint64_t dropped_frames;
  80. uint64_t last_nack_count;
  81. ftl_handle_t ftl_handle;
  82. ftl_ingest_params_t params;
  83. int peak_kbps;
  84. uint32_t scale_width, scale_height, width, height;
  85. frame_of_nalus_t coded_pic_buffer;
  86. };
  87. static void log_libftl_messages(ftl_log_severity_t log_level,
  88. const char *message);
  89. static int init_connect(struct ftl_stream *stream);
  90. static void *connect_thread(void *data);
  91. static void *status_thread(void *data);
  92. static int _ftl_error_to_obs_error(int status);
  93. static const char *ftl_stream_getname(void *unused)
  94. {
  95. UNUSED_PARAMETER(unused);
  96. return obs_module_text("FTLStream");
  97. }
  98. static void log_ftl(int level, const char *format, va_list args)
  99. {
  100. blogva(LOG_INFO, format, args);
  101. }
  102. static inline size_t num_buffered_packets(struct ftl_stream *stream);
  103. static inline void free_packets(struct ftl_stream *stream)
  104. {
  105. size_t num_packets;
  106. pthread_mutex_lock(&stream->packets_mutex);
  107. num_packets = num_buffered_packets(stream);
  108. if (num_packets)
  109. info("Freeing %d remaining packets", (int)num_packets);
  110. while (stream->packets.size) {
  111. struct encoder_packet packet;
  112. circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
  113. obs_encoder_packet_release(&packet);
  114. }
  115. pthread_mutex_unlock(&stream->packets_mutex);
  116. }
  117. static inline bool stopping(struct ftl_stream *stream)
  118. {
  119. return os_event_try(stream->stop_event) != EAGAIN;
  120. }
  121. static inline bool connecting(struct ftl_stream *stream)
  122. {
  123. return os_atomic_load_bool(&stream->connecting);
  124. }
  125. static inline bool active(struct ftl_stream *stream)
  126. {
  127. return os_atomic_load_bool(&stream->active);
  128. }
  129. static inline bool disconnected(struct ftl_stream *stream)
  130. {
  131. return os_atomic_load_bool(&stream->disconnected);
  132. }
  133. static void ftl_stream_destroy(void *data)
  134. {
  135. struct ftl_stream *stream = data;
  136. ftl_status_t status_code;
  137. info("ftl_stream_destroy");
  138. if (stopping(stream) && !connecting(stream)) {
  139. pthread_join(stream->send_thread, NULL);
  140. } else if (connecting(stream) || active(stream)) {
  141. if (stream->connecting) {
  142. info("wait for connect_thread to terminate");
  143. pthread_join(stream->status_thread, NULL);
  144. pthread_join(stream->connect_thread, NULL);
  145. info("wait for connect_thread to terminate: done");
  146. }
  147. stream->stop_ts = 0;
  148. os_event_signal(stream->stop_event);
  149. if (active(stream)) {
  150. os_sem_post(stream->send_sem);
  151. obs_output_end_data_capture(stream->output);
  152. pthread_join(stream->send_thread, NULL);
  153. }
  154. }
  155. info("ingest destroy");
  156. status_code = ftl_ingest_destroy(&stream->ftl_handle);
  157. if (status_code != FTL_SUCCESS) {
  158. info("Failed to destroy from ingest %d", status_code);
  159. }
  160. if (stream) {
  161. free_packets(stream);
  162. dstr_free(&stream->path);
  163. dstr_free(&stream->username);
  164. dstr_free(&stream->password);
  165. dstr_free(&stream->encoder_name);
  166. dstr_free(&stream->bind_ip);
  167. os_event_destroy(stream->stop_event);
  168. os_sem_destroy(stream->send_sem);
  169. pthread_mutex_destroy(&stream->packets_mutex);
  170. circlebuf_free(&stream->packets);
  171. bfree(stream);
  172. }
  173. }
  174. static void *ftl_stream_create(obs_data_t *settings, obs_output_t *output)
  175. {
  176. struct ftl_stream *stream = bzalloc(sizeof(struct ftl_stream));
  177. info("ftl_stream_create");
  178. stream->output = output;
  179. pthread_mutex_init_value(&stream->packets_mutex);
  180. stream->peak_kbps = -1;
  181. ftl_init();
  182. if (pthread_mutex_init(&stream->packets_mutex, NULL) != 0) {
  183. goto fail;
  184. }
  185. if (os_event_init(&stream->stop_event, OS_EVENT_TYPE_MANUAL) != 0) {
  186. goto fail;
  187. }
  188. stream->coded_pic_buffer.total = 0;
  189. stream->coded_pic_buffer.complete_frame = 0;
  190. UNUSED_PARAMETER(settings);
  191. return stream;
  192. fail:
  193. return NULL;
  194. }
  195. static void ftl_stream_stop(void *data, uint64_t ts)
  196. {
  197. struct ftl_stream *stream = data;
  198. info("ftl_stream_stop");
  199. if (stopping(stream) && ts != 0) {
  200. return;
  201. }
  202. if (connecting(stream)) {
  203. pthread_join(stream->status_thread, NULL);
  204. pthread_join(stream->connect_thread, NULL);
  205. }
  206. stream->stop_ts = ts / 1000ULL;
  207. os_event_signal(stream->stop_event);
  208. if (ts) {
  209. stream->shutdown_timeout_ts = ts +
  210. (uint64_t)stream->max_shutdown_time_sec * 1000000000ULL;
  211. }
  212. if (active(stream)) {
  213. if (stream->stop_ts == 0)
  214. os_sem_post(stream->send_sem);
  215. }
  216. }
  217. static inline bool get_next_packet(struct ftl_stream *stream,
  218. struct encoder_packet *packet)
  219. {
  220. bool new_packet = false;
  221. pthread_mutex_lock(&stream->packets_mutex);
  222. if (stream->packets.size) {
  223. circlebuf_pop_front(&stream->packets, packet,
  224. sizeof(struct encoder_packet));
  225. new_packet = true;
  226. }
  227. pthread_mutex_unlock(&stream->packets_mutex);
  228. return new_packet;
  229. }
  230. static int avc_get_video_frame(struct ftl_stream *stream,
  231. struct encoder_packet *packet, bool is_header, size_t idx)
  232. {
  233. int consumed = 0;
  234. int len = (int)packet->size;
  235. nalu_t *nalu;
  236. unsigned char *video_stream = packet->data;
  237. while (consumed < packet->size) {
  238. size_t total_max = sizeof(stream->coded_pic_buffer.nalus) /
  239. sizeof(stream->coded_pic_buffer.nalus[0]);
  240. if (stream->coded_pic_buffer.total >= total_max) {
  241. warn("ERROR: cannot continue, nalu buffers are full");
  242. return -1;
  243. }
  244. nalu = &stream->coded_pic_buffer.nalus
  245. [stream->coded_pic_buffer.total];
  246. if (is_header) {
  247. if (consumed == 0) {
  248. //first 6 bytes are some obs header with part
  249. //of the sps
  250. video_stream += 6;
  251. consumed += 6;
  252. } else {
  253. //another spacer byte of 0x1
  254. video_stream += 1;
  255. consumed += 1;
  256. }
  257. len = video_stream[0] << 8 | video_stream[1];
  258. video_stream += 2;
  259. consumed += 2;
  260. } else {
  261. len = video_stream[0] << 24 |
  262. video_stream[1] << 16 |
  263. video_stream[2] << 8 |
  264. video_stream[3];
  265. if (len > (packet->size - consumed)) {
  266. warn("ERROR: got len of %d but packet only "
  267. "has %d left",
  268. len, packet->size - consumed);
  269. }
  270. consumed += 4;
  271. video_stream += 4;
  272. }
  273. consumed += len;
  274. uint8_t nalu_type = video_stream[0] & 0x1F;
  275. uint8_t nri = (video_stream[0] >> 5) & 0x3;
  276. int send_marker_bit = (consumed >= packet->size) && !is_header;
  277. if ((nalu_type != 12 && nalu_type != 6 && nalu_type != 9) ||
  278. nri) {
  279. nalu->data = video_stream;
  280. nalu->len = len;
  281. nalu->send_marker_bit = 0;
  282. stream->coded_pic_buffer.total++;
  283. }
  284. video_stream += len;
  285. }
  286. if (!is_header) {
  287. size_t idx = stream->coded_pic_buffer.total - 1;
  288. stream->coded_pic_buffer.nalus[idx].send_marker_bit = 1;
  289. }
  290. return 0;
  291. }
  292. static int send_packet(struct ftl_stream *stream,
  293. struct encoder_packet *packet, bool is_header, size_t idx)
  294. {
  295. int bytes_sent = 0;
  296. int recv_size = 0;
  297. int ret = 0;
  298. if (packet->type == OBS_ENCODER_VIDEO) {
  299. stream->coded_pic_buffer.total = 0;
  300. avc_get_video_frame(stream, packet, is_header, idx);
  301. int i;
  302. for (i = 0; i < stream->coded_pic_buffer.total; i++) {
  303. nalu_t *nalu = &stream->coded_pic_buffer.nalus[i];
  304. bytes_sent += ftl_ingest_send_media_dts(
  305. &stream->ftl_handle,
  306. FTL_VIDEO_DATA,
  307. packet->dts_usec,
  308. nalu->data,
  309. nalu->len,
  310. nalu->send_marker_bit);
  311. if (nalu->send_marker_bit) {
  312. stream->frames_sent++;
  313. }
  314. }
  315. } else if (packet->type == OBS_ENCODER_AUDIO) {
  316. bytes_sent += ftl_ingest_send_media_dts(
  317. &stream->ftl_handle,
  318. FTL_AUDIO_DATA,
  319. packet->dts_usec,
  320. packet->data,
  321. (int)packet->size, 0);
  322. } else {
  323. warn("Got packet type %d", packet->type);
  324. }
  325. if (is_header) {
  326. bfree(packet->data);
  327. }
  328. else {
  329. obs_encoder_packet_release(packet);
  330. }
  331. stream->total_bytes_sent += bytes_sent;
  332. return ret;
  333. }
  334. static void set_peak_bitrate(struct ftl_stream *stream)
  335. {
  336. int speedtest_kbps = 15000;
  337. int speedtest_duration = 1000;
  338. speed_test_t results;
  339. ftl_status_t status_code;
  340. status_code = ftl_ingest_speed_test_ex(
  341. &stream->ftl_handle,
  342. speedtest_kbps,
  343. speedtest_duration,
  344. &results);
  345. float percent_lost = 0;
  346. if (status_code == FTL_SUCCESS) {
  347. percent_lost = (float)results.lost_pkts * 100.f /
  348. (float)results.pkts_sent;
  349. } else {
  350. warn("Speed test failed with: %s",
  351. ftl_status_code_to_string(status_code));
  352. }
  353. // Get what the user set the encoding bitrate to.
  354. obs_encoder_t *video_encoder = obs_output_get_video_encoder(stream->output);
  355. obs_data_t *video_settings = obs_encoder_get_settings(video_encoder);
  356. int user_desired_bitrate = (int)obs_data_get_int(video_settings, "bitrate");
  357. obs_data_release(video_settings);
  358. // Report the results.
  359. info("Speed test completed: User desired bitrate %d, Peak kbps %d, "
  360. "initial rtt %d, "
  361. "final rtt %d, %3.2f lost packets",
  362. user_desired_bitrate,
  363. results.peak_kbps,
  364. results.starting_rtt,
  365. results.ending_rtt,
  366. percent_lost);
  367. // We still want to set the peak to about 1.2x what the target bitrate is,
  368. // even if the speed test reported it should be lower. If we don't, FTL
  369. // will queue data on the client and start adding latency. If the internet
  370. // connection really can't handle the bitrate the user will see either lost frame
  371. // and recovered frame counts go up, which is reflect in the dropped_frames count.
  372. stream->peak_kbps = stream->params.peak_kbps = user_desired_bitrate * 1.2;
  373. ftl_ingest_update_params(&stream->ftl_handle, &stream->params);
  374. }
  375. static inline bool send_headers(struct ftl_stream *stream, int64_t dts_usec);
  376. static inline bool can_shutdown_stream(struct ftl_stream *stream,
  377. struct encoder_packet *packet)
  378. {
  379. uint64_t cur_time = os_gettime_ns();
  380. bool timeout = cur_time >= stream->shutdown_timeout_ts;
  381. if (timeout)
  382. info("Stream shutdown timeout reached (%d second(s))",
  383. stream->max_shutdown_time_sec);
  384. return timeout || packet->sys_dts_usec >= (int64_t)stream->stop_ts;
  385. }
  386. static void *send_thread(void *data)
  387. {
  388. struct ftl_stream *stream = data;
  389. ftl_status_t status_code;
  390. os_set_thread_name("ftl-stream: send_thread");
  391. while (os_sem_wait(stream->send_sem) == 0) {
  392. struct encoder_packet packet;
  393. if (stopping(stream) && stream->stop_ts == 0) {
  394. break;
  395. }
  396. if (!get_next_packet(stream, &packet))
  397. continue;
  398. if (stopping(stream)) {
  399. if (can_shutdown_stream(stream, &packet)) {
  400. obs_encoder_packet_release(&packet);
  401. break;
  402. }
  403. }
  404. /* sends sps/pps on every key frame as this is typically
  405. * required for webrtc */
  406. if (packet.keyframe) {
  407. if (!send_headers(stream, packet.dts_usec)) {
  408. os_atomic_set_bool(&stream->disconnected, true);
  409. break;
  410. }
  411. }
  412. if (send_packet(stream, &packet, false, packet.track_idx) < 0) {
  413. os_atomic_set_bool(&stream->disconnected, true);
  414. break;
  415. }
  416. }
  417. if (disconnected(stream)) {
  418. info("Disconnected from %s", stream->path.array);
  419. } else {
  420. info("User stopped the stream");
  421. }
  422. if (!stopping(stream)) {
  423. pthread_detach(stream->send_thread);
  424. obs_output_signal_stop(stream->output, OBS_OUTPUT_DISCONNECTED);
  425. } else {
  426. obs_output_end_data_capture(stream->output);
  427. }
  428. info("ingest disconnect");
  429. status_code = ftl_ingest_disconnect(&stream->ftl_handle);
  430. if (status_code != FTL_SUCCESS) {
  431. printf("Failed to disconnect from ingest %d", status_code);
  432. }
  433. free_packets(stream);
  434. os_event_reset(stream->stop_event);
  435. os_atomic_set_bool(&stream->active, false);
  436. stream->sent_headers = false;
  437. return NULL;
  438. }
  439. static bool send_video_header(struct ftl_stream *stream, int64_t dts_usec)
  440. {
  441. obs_output_t *context = stream->output;
  442. obs_encoder_t *vencoder = obs_output_get_video_encoder(context);
  443. uint8_t *header;
  444. size_t size;
  445. struct encoder_packet packet = {
  446. .type = OBS_ENCODER_VIDEO,
  447. .timebase_den = 1,
  448. .keyframe = true,
  449. .dts_usec = dts_usec
  450. };
  451. obs_encoder_get_extra_data(vencoder, &header, &size);
  452. packet.size = obs_parse_avc_header(&packet.data, header, size);
  453. return send_packet(stream, &packet, true, 0) >= 0;
  454. }
  455. static inline bool send_headers(struct ftl_stream *stream, int64_t dts_usec)
  456. {
  457. stream->sent_headers = true;
  458. if (!send_video_header(stream, dts_usec))
  459. return false;
  460. return true;
  461. }
  462. static inline bool reset_semaphore(struct ftl_stream *stream)
  463. {
  464. os_sem_destroy(stream->send_sem);
  465. return os_sem_init(&stream->send_sem, 0) == 0;
  466. }
  467. #ifdef _WIN32
  468. #define socklen_t int
  469. #endif
  470. static int init_send(struct ftl_stream *stream)
  471. {
  472. int ret;
  473. reset_semaphore(stream);
  474. ret = pthread_create(&stream->send_thread, NULL, send_thread, stream);
  475. if (ret != 0) {
  476. warn("Failed to create send thread");
  477. return OBS_OUTPUT_ERROR;
  478. }
  479. os_atomic_set_bool(&stream->active, true);
  480. obs_output_begin_data_capture(stream->output, 0);
  481. return OBS_OUTPUT_SUCCESS;
  482. }
  483. static int lookup_ingest_ip(const char *ingest_location, char *ingest_ip)
  484. {
  485. struct hostent *remoteHost;
  486. struct in_addr addr;
  487. int retval = -1;
  488. ingest_ip[0] = '\0';
  489. remoteHost = gethostbyname(ingest_location);
  490. if (remoteHost && remoteHost->h_addrtype == AF_INET) {
  491. int i = 0;
  492. while (remoteHost->h_addr_list[i] != 0) {
  493. addr.s_addr = *(u_long *)remoteHost->h_addr_list[i++];
  494. blog(LOG_INFO, "IP Address #%d of ingest is: %s",
  495. i, inet_ntoa(addr));
  496. /*only use the first ip found*/
  497. if (strlen(ingest_ip) == 0) {
  498. strcpy(ingest_ip, inet_ntoa(addr));
  499. retval = 0;
  500. }
  501. }
  502. }
  503. return retval;
  504. }
  505. static int try_connect(struct ftl_stream *stream)
  506. {
  507. ftl_status_t status_code;
  508. if (dstr_is_empty(&stream->path)) {
  509. warn("URL is empty");
  510. return OBS_OUTPUT_BAD_PATH;
  511. }
  512. info("Connecting to FTL Ingest URL %s...", stream->path.array);
  513. stream->width = (int)obs_output_get_width(stream->output);
  514. stream->height = (int)obs_output_get_height(stream->output);
  515. status_code = ftl_ingest_connect(&stream->ftl_handle);
  516. if (status_code != FTL_SUCCESS) {
  517. warn("Ingest connect failed with: %s (%d)",
  518. ftl_status_code_to_string(status_code),
  519. status_code);
  520. return _ftl_error_to_obs_error(status_code);
  521. }
  522. info("Connection to %s successful", stream->path.array);
  523. // Always get the peak bitrate when we are starting.
  524. set_peak_bitrate(stream);
  525. pthread_create(&stream->status_thread, NULL, status_thread, stream);
  526. return init_send(stream);
  527. }
  528. static bool ftl_stream_start(void *data)
  529. {
  530. struct ftl_stream *stream = data;
  531. info("ftl_stream_start");
  532. // Mixer doesn't support bframes. So force them off.
  533. obs_encoder_t *video_encoder = obs_output_get_video_encoder(stream->output);
  534. obs_data_t *video_settings = obs_encoder_get_settings(video_encoder);
  535. obs_data_set_int(video_settings, "bf", 0);
  536. obs_data_release(video_settings);
  537. if (!obs_output_can_begin_data_capture(stream->output, 0)) {
  538. return false;
  539. }
  540. if (!obs_output_initialize_encoders(stream->output, 0)) {
  541. return false;
  542. }
  543. stream->frames_sent = 0;
  544. os_atomic_set_bool(&stream->connecting, true);
  545. return pthread_create(&stream->connect_thread, NULL, connect_thread,
  546. stream) == 0;
  547. }
  548. static inline bool add_packet(struct ftl_stream *stream,
  549. struct encoder_packet *packet)
  550. {
  551. circlebuf_push_back(&stream->packets, packet,
  552. sizeof(struct encoder_packet));
  553. return true;
  554. }
  555. static inline size_t num_buffered_packets(struct ftl_stream *stream)
  556. {
  557. return stream->packets.size / sizeof(struct encoder_packet);
  558. }
  559. static void drop_frames(struct ftl_stream *stream, const char *name,
  560. int highest_priority, bool pframes)
  561. {
  562. UNUSED_PARAMETER(pframes);
  563. struct circlebuf new_buf = {0};
  564. int num_frames_dropped = 0;
  565. #ifdef _DEBUG
  566. int start_packets = (int)num_buffered_packets(stream);
  567. #else
  568. UNUSED_PARAMETER(name);
  569. #endif
  570. circlebuf_reserve(&new_buf, sizeof(struct encoder_packet) * 8);
  571. while (stream->packets.size) {
  572. struct encoder_packet packet;
  573. circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
  574. /* do not drop audio data or video keyframes */
  575. if (packet.type == OBS_ENCODER_AUDIO ||
  576. packet.drop_priority >= highest_priority) {
  577. circlebuf_push_back(&new_buf, &packet, sizeof(packet));
  578. } else {
  579. num_frames_dropped++;
  580. obs_encoder_packet_release(&packet);
  581. }
  582. }
  583. circlebuf_free(&stream->packets);
  584. stream->packets = new_buf;
  585. if (stream->min_priority < highest_priority)
  586. stream->min_priority = highest_priority;
  587. if (!num_frames_dropped)
  588. return;
  589. stream->dropped_frames += num_frames_dropped;
  590. #ifdef _DEBUG
  591. debug("Dropped %s, prev packet count: %d, new packet count: %d",
  592. name,
  593. start_packets,
  594. (int)num_buffered_packets(stream));
  595. #endif
  596. }
  597. static bool find_first_video_packet(struct ftl_stream *stream,
  598. struct encoder_packet *first)
  599. {
  600. size_t count = stream->packets.size / sizeof(*first);
  601. for (size_t i = 0; i < count; i++) {
  602. struct encoder_packet *cur = circlebuf_data(&stream->packets,
  603. i * sizeof(*first));
  604. if (cur->type == OBS_ENCODER_VIDEO && !cur->keyframe) {
  605. *first = *cur;
  606. return true;
  607. }
  608. }
  609. return false;
  610. }
  611. static void check_to_drop_frames(struct ftl_stream *stream, bool pframes)
  612. {
  613. struct encoder_packet first;
  614. int64_t buffer_duration_usec;
  615. size_t num_packets = num_buffered_packets(stream);
  616. const char *name = pframes ? "p-frames" : "b-frames";
  617. int priority = pframes ?
  618. OBS_NAL_PRIORITY_HIGHEST : OBS_NAL_PRIORITY_HIGH;
  619. int64_t drop_threshold = pframes ?
  620. stream->pframe_drop_threshold_usec :
  621. stream->drop_threshold_usec;
  622. if (num_packets < 5) {
  623. if (!pframes)
  624. stream->congestion = 0.0f;
  625. return;
  626. }
  627. if (!find_first_video_packet(stream, &first))
  628. return;
  629. /* if the amount of time stored in the buffered packets waiting to be
  630. * sent is higher than threshold, drop frames */
  631. buffer_duration_usec = stream->last_dts_usec - first.dts_usec;
  632. if (!pframes) {
  633. stream->congestion = (float)buffer_duration_usec /
  634. (float)drop_threshold;
  635. }
  636. if (buffer_duration_usec > drop_threshold) {
  637. debug("buffer_duration_usec: %" PRId64, buffer_duration_usec);
  638. drop_frames(stream, name, priority, pframes);
  639. }
  640. }
  641. static bool add_video_packet(struct ftl_stream *stream,
  642. struct encoder_packet *packet)
  643. {
  644. check_to_drop_frames(stream, false);
  645. check_to_drop_frames(stream, true);
  646. /* if currently dropping frames, drop packets until it reaches the
  647. * desired priority */
  648. if (packet->priority < stream->min_priority) {
  649. stream->dropped_frames++;
  650. return false;
  651. } else {
  652. stream->min_priority = 0;
  653. }
  654. stream->last_dts_usec = packet->dts_usec;
  655. return add_packet(stream, packet);
  656. }
  657. static void ftl_stream_data(void *data, struct encoder_packet *packet)
  658. {
  659. struct ftl_stream *stream = data;
  660. struct encoder_packet new_packet;
  661. bool added_packet = false;
  662. if (disconnected(stream) || !active(stream))
  663. return;
  664. if (packet->type == OBS_ENCODER_VIDEO)
  665. obs_parse_avc_packet(&new_packet, packet);
  666. else
  667. obs_encoder_packet_ref(&new_packet, packet);
  668. pthread_mutex_lock(&stream->packets_mutex);
  669. if (!disconnected(stream)) {
  670. added_packet = (packet->type == OBS_ENCODER_VIDEO) ?
  671. add_video_packet(stream, &new_packet) :
  672. add_packet(stream, &new_packet);
  673. }
  674. pthread_mutex_unlock(&stream->packets_mutex);
  675. if (added_packet)
  676. os_sem_post(stream->send_sem);
  677. else
  678. obs_encoder_packet_release(&new_packet);
  679. }
  680. static void ftl_stream_defaults(obs_data_t *defaults)
  681. {
  682. UNUSED_PARAMETER(defaults);
  683. }
  684. static obs_properties_t *ftl_stream_properties(void *unused)
  685. {
  686. UNUSED_PARAMETER(unused);
  687. obs_properties_t *props = obs_properties_create();
  688. obs_properties_add_int(props, "peak_bitrate_kbps",
  689. obs_module_text("FTLStream.PeakBitrate"),
  690. 1000, 10000, 500);
  691. return props;
  692. }
  693. static uint64_t ftl_stream_total_bytes_sent(void *data)
  694. {
  695. struct ftl_stream *stream = data;
  696. return stream->total_bytes_sent;
  697. }
  698. static int ftl_stream_dropped_frames(void *data)
  699. {
  700. struct ftl_stream *stream = data;
  701. return stream->dropped_frames;
  702. }
  703. static float ftl_stream_congestion(void *data)
  704. {
  705. struct ftl_stream *stream = data;
  706. return stream->min_priority > 0 ? 1.0f : stream->congestion;
  707. }
  708. enum ret_type {
  709. RET_CONTINUE,
  710. RET_BREAK,
  711. RET_EXIT,
  712. };
  713. static enum ret_type ftl_event(struct ftl_stream *stream,
  714. ftl_status_msg_t status)
  715. {
  716. if (status.msg.event.type != FTL_STATUS_EVENT_TYPE_DISCONNECTED)
  717. return RET_CONTINUE;
  718. info("Disconnected from ingest with reason: %s",
  719. ftl_status_code_to_string(status.msg.event.error_code));
  720. if (status.msg.event.reason == FTL_STATUS_EVENT_REASON_API_REQUEST) {
  721. return RET_BREAK;
  722. }
  723. //tell OBS and it will trigger a reconnection
  724. blog(LOG_WARNING, "Reconnecting to Ingest");
  725. obs_output_signal_stop(stream->output, OBS_OUTPUT_DISCONNECTED);
  726. return RET_EXIT;
  727. }
  728. static void *status_thread(void *data)
  729. {
  730. struct ftl_stream *stream = data;
  731. ftl_status_msg_t status;
  732. ftl_status_t status_code;
  733. while (!disconnected(stream)) {
  734. status_code = ftl_ingest_get_status(&stream->ftl_handle,
  735. &status, 1000);
  736. if (status_code == FTL_STATUS_TIMEOUT ||
  737. status_code == FTL_QUEUE_EMPTY) {
  738. continue;
  739. } else if (status_code == FTL_NOT_INITIALIZED) {
  740. break;
  741. }
  742. if (status.type == FTL_STATUS_EVENT) {
  743. enum ret_type ret_type = ftl_event(stream, status);
  744. if (ret_type == RET_EXIT)
  745. return NULL;
  746. else if (ret_type == RET_BREAK)
  747. break;
  748. } else if(status.type == FTL_STATUS_LOG) {
  749. blog(LOG_INFO, "[%d] %s", status.msg.log.log_level,
  750. status.msg.log.string);
  751. } else if (status.type == FTL_STATUS_VIDEO_PACKETS) {
  752. ftl_packet_stats_msg_t *p = &status.msg.pkt_stats;
  753. // Report nack requests as dropped frames
  754. stream->dropped_frames +=
  755. p->nack_reqs -stream->last_nack_count;
  756. stream->last_nack_count = p->nack_reqs;
  757. int log_level = p->nack_reqs > 2 ? LOG_INFO : LOG_DEBUG;
  758. blog(log_level, "Avg packet send per second %3.1f, "
  759. "total nack requests %d",
  760. (float)p->sent * 1000.f / p->period,
  761. p->nack_reqs);
  762. } else if (status.type == FTL_STATUS_VIDEO_PACKETS_INSTANT) {
  763. ftl_packet_stats_instant_msg_t *p =
  764. &status.msg.ipkt_stats;
  765. int log_level = p->avg_rtt > 200 ? LOG_INFO : LOG_DEBUG;
  766. blog(log_level, "avg transmit delay %dms "
  767. "(min: %d, max: %d), "
  768. "avg rtt %dms (min: %d, max: %d)",
  769. p->avg_xmit_delay,
  770. p->min_xmit_delay, p->max_xmit_delay,
  771. p->avg_rtt, p->min_rtt, p->max_rtt);
  772. } else if (status.type == FTL_STATUS_VIDEO) {
  773. ftl_video_frame_stats_msg_t *v =
  774. &status.msg.video_stats;
  775. int log_level = v->queue_fullness > 5 ?
  776. LOG_INFO : LOG_DEBUG;
  777. blog(log_level, "Queue an average of %3.2f fps "
  778. "(%3.1f kbps), "
  779. "sent an average of %3.2f fps "
  780. "(%3.1f kbps), "
  781. "queue fullness %d, "
  782. "max frame size %d",
  783. (float)v->frames_queued * 1000.f / v->period,
  784. (float)v->bytes_queued / v->period * 8,
  785. (float)v->frames_sent * 1000.f / v->period,
  786. (float)v->bytes_sent / v->period * 8,
  787. v->queue_fullness, v->max_frame_size);
  788. } else {
  789. blog(LOG_DEBUG, "Status: Got Status message of type "
  790. "%d", status.type);
  791. }
  792. }
  793. blog(LOG_DEBUG, "status_thread: Exited");
  794. pthread_detach(stream->status_thread);
  795. return NULL;
  796. }
  797. static void *connect_thread(void *data)
  798. {
  799. struct ftl_stream *stream = data;
  800. int ret;
  801. os_set_thread_name("ftl-stream: connect_thread");
  802. blog(LOG_WARNING, "ftl-stream: connect thread");
  803. ret = init_connect(stream);
  804. if (ret != OBS_OUTPUT_SUCCESS) {
  805. obs_output_signal_stop(stream->output, ret);
  806. return NULL;
  807. }
  808. ret = try_connect(stream);
  809. if (ret != OBS_OUTPUT_SUCCESS) {
  810. obs_output_signal_stop(stream->output, ret);
  811. info("Connection to %s failed: %d", stream->path.array, ret);
  812. }
  813. if (!stopping(stream))
  814. pthread_detach(stream->connect_thread);
  815. os_atomic_set_bool(&stream->connecting, false);
  816. return NULL;
  817. }
  818. static void log_libftl_messages(ftl_log_severity_t log_level,
  819. const char * message)
  820. {
  821. UNUSED_PARAMETER(log_level);
  822. blog(LOG_WARNING, "[libftl] %s", message);
  823. }
  824. static int init_connect(struct ftl_stream *stream)
  825. {
  826. obs_service_t *service;
  827. obs_data_t *settings;
  828. const char *bind_ip, *key;
  829. ftl_status_t status_code;
  830. info("init_connect");
  831. if (stopping(stream))
  832. pthread_join(stream->send_thread, NULL);
  833. free_packets(stream);
  834. service = obs_output_get_service(stream->output);
  835. if (!service) {
  836. return OBS_OUTPUT_ERROR;
  837. }
  838. os_atomic_set_bool(&stream->disconnected, false);
  839. stream->total_bytes_sent = 0;
  840. stream->dropped_frames = 0;
  841. stream->min_priority = 0;
  842. settings = obs_output_get_settings(stream->output);
  843. obs_encoder_t *video_encoder =
  844. obs_output_get_video_encoder(stream->output);
  845. obs_data_t *video_settings =
  846. obs_encoder_get_settings(video_encoder);
  847. dstr_copy(&stream->path, obs_service_get_url(service));
  848. key = obs_service_get_key(service);
  849. struct obs_video_info ovi;
  850. int fps_num = 30, fps_den = 1;
  851. if (obs_get_video_info(&ovi)) {
  852. fps_num = ovi.fps_num;
  853. fps_den = ovi.fps_den;
  854. }
  855. int target_bitrate = (int)obs_data_get_int(video_settings, "bitrate");
  856. int peak_bitrate = (int)((float)target_bitrate * 1.1f);
  857. //minimum overshoot tolerance of 10%
  858. if (peak_bitrate < target_bitrate) {
  859. peak_bitrate = target_bitrate;
  860. }
  861. stream->params.stream_key = (char*)key;
  862. stream->params.video_codec = FTL_VIDEO_H264;
  863. stream->params.audio_codec = FTL_AUDIO_OPUS;
  864. stream->params.ingest_hostname = stream->path.array;
  865. stream->params.vendor_name = "OBS Studio";
  866. stream->params.vendor_version = OBS_VERSION;
  867. stream->params.peak_kbps =
  868. stream->peak_kbps < 0 ? 0 : stream->peak_kbps;
  869. //not required when using ftl_ingest_send_media_dts
  870. stream->params.fps_num = 0;
  871. stream->params.fps_den = 0;
  872. status_code = ftl_ingest_create(&stream->ftl_handle, &stream->params);
  873. if (status_code != FTL_SUCCESS) {
  874. if (status_code == FTL_BAD_OR_INVALID_STREAM_KEY) {
  875. blog(LOG_ERROR, "Invalid Key (%s)",
  876. ftl_status_code_to_string(status_code));
  877. return OBS_OUTPUT_INVALID_STREAM;
  878. }
  879. else {
  880. blog(LOG_ERROR, "Failed to create ingest handle (%s)",
  881. ftl_status_code_to_string(status_code));
  882. return OBS_OUTPUT_ERROR;
  883. }
  884. }
  885. dstr_copy(&stream->username, obs_service_get_username(service));
  886. dstr_copy(&stream->password, obs_service_get_password(service));
  887. dstr_depad(&stream->path);
  888. stream->drop_threshold_usec =
  889. (int64_t)obs_data_get_int(settings, OPT_DROP_THRESHOLD) * 1000;
  890. stream->max_shutdown_time_sec =
  891. (int)obs_data_get_int(settings, OPT_MAX_SHUTDOWN_TIME_SEC);
  892. bind_ip = obs_data_get_string(settings, OPT_BIND_IP);
  893. dstr_copy(&stream->bind_ip, bind_ip);
  894. obs_data_release(settings);
  895. obs_data_release(video_settings);
  896. return OBS_OUTPUT_SUCCESS;
  897. }
  898. // Returns 0 on success
  899. static int _ftl_error_to_obs_error(int status)
  900. {
  901. /* Map FTL errors to OBS errors */
  902. switch (status) {
  903. case FTL_SUCCESS:
  904. return OBS_OUTPUT_SUCCESS;
  905. case FTL_SOCKET_NOT_CONNECTED:
  906. case FTL_MALLOC_FAILURE:
  907. case FTL_INTERNAL_ERROR:
  908. case FTL_CONFIG_ERROR:
  909. case FTL_NOT_ACTIVE_STREAM:
  910. case FTL_NOT_CONNECTED:
  911. case FTL_ALREADY_CONNECTED:
  912. case FTL_STATUS_TIMEOUT:
  913. case FTL_QUEUE_FULL:
  914. case FTL_STATUS_WAITING_FOR_KEY_FRAME:
  915. case FTL_QUEUE_EMPTY:
  916. case FTL_NOT_INITIALIZED:
  917. return OBS_OUTPUT_ERROR;
  918. case FTL_BAD_REQUEST:
  919. case FTL_DNS_FAILURE:
  920. case FTL_CONNECT_ERROR:
  921. case FTL_UNSUPPORTED_MEDIA_TYPE:
  922. case FTL_OLD_VERSION:
  923. case FTL_UNAUTHORIZED:
  924. case FTL_AUDIO_SSRC_COLLISION:
  925. case FTL_VIDEO_SSRC_COLLISION:
  926. case FTL_STREAM_REJECTED:
  927. case FTL_BAD_OR_INVALID_STREAM_KEY:
  928. case FTL_CHANNEL_IN_USE:
  929. case FTL_REGION_UNSUPPORTED:
  930. case FTL_GAME_BLOCKED:
  931. return OBS_OUTPUT_CONNECT_FAILED;
  932. case FTL_NO_MEDIA_TIMEOUT:
  933. return OBS_OUTPUT_DISCONNECTED;
  934. case FTL_USER_DISCONNECT:
  935. return OBS_OUTPUT_SUCCESS;
  936. case FTL_UNKNOWN_ERROR_CODE:
  937. default:
  938. /* Unknown FTL error */
  939. return OBS_OUTPUT_ERROR;
  940. }
  941. }
  942. struct obs_output_info ftl_output_info = {
  943. .id = "ftl_output",
  944. .flags = OBS_OUTPUT_AV |
  945. OBS_OUTPUT_ENCODED |
  946. OBS_OUTPUT_SERVICE,
  947. .encoded_video_codecs = "h264",
  948. .encoded_audio_codecs = "opus",
  949. .get_name = ftl_stream_getname,
  950. .create = ftl_stream_create,
  951. .destroy = ftl_stream_destroy,
  952. .start = ftl_stream_start,
  953. .stop = ftl_stream_stop,
  954. .encoded_packet = ftl_stream_data,
  955. .get_defaults = ftl_stream_defaults,
  956. .get_properties = ftl_stream_properties,
  957. .get_total_bytes = ftl_stream_total_bytes_sent,
  958. .get_congestion = ftl_stream_congestion,
  959. .get_dropped_frames = ftl_stream_dropped_frames
  960. };