rtmp-stream.c 15 KB


  1. /******************************************************************************
  2. Copyright (C) 2014 by Hugh Bailey <[email protected]>
  3. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 2 of the License, or
  6. (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ******************************************************************************/
  14. #include <obs.h>
  15. #include <obs-avc.h>
  16. #include <util/platform.h>
  17. #include <util/circlebuf.h>
  18. #include <util/dstr.h>
  19. #include <util/threading.h>
  20. #include <inttypes.h>
  21. #include "librtmp/rtmp.h"
  22. #include "librtmp/log.h"
  23. #include "flv-mux.h"
  24. //#define FILE_TEST
  25. //#define TEST_FRAMEDROPS
  26. struct rtmp_stream {
  27. obs_output_t output;
  28. pthread_mutex_t packets_mutex;
  29. struct circlebuf packets;
  30. bool connecting;
  31. pthread_t connect_thread;
  32. bool active;
  33. pthread_t send_thread;
  34. os_sem_t send_sem;
  35. os_event_t stop_event;
  36. struct dstr path, key;
  37. struct dstr username, password;
  38. /* frame drop variables */
  39. int64_t drop_threshold_usec;
  40. int64_t min_drop_dts_usec;
  41. int min_priority;
  42. int64_t last_dts_usec;
  43. #ifdef FILE_TEST
  44. FILE *test;
  45. #endif
  46. RTMP rtmp;
  47. };
  48. static const char *rtmp_stream_getname(const char *locale)
  49. {
  50. /* TODO: locale stuff */
  51. UNUSED_PARAMETER(locale);
  52. return "RTMP Stream";
  53. }
  54. static void log_rtmp(int level, const char *format, va_list args)
  55. {
  56. if (level > RTMP_LOGWARNING)
  57. return;
  58. blogva(LOG_INFO, format, args);
  59. }
  60. static inline void free_packets(struct rtmp_stream *stream)
  61. {
  62. while (stream->packets.size) {
  63. struct encoder_packet packet;
  64. circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
  65. obs_free_encoder_packet(&packet);
  66. }
  67. }
  68. static void rtmp_stream_stop(void *data);
  69. static void rtmp_stream_destroy(void *data)
  70. {
  71. struct rtmp_stream *stream = data;
  72. if (stream->active)
  73. rtmp_stream_stop(data);
  74. if (stream) {
  75. free_packets(stream);
  76. dstr_free(&stream->path);
  77. dstr_free(&stream->key);
  78. dstr_free(&stream->username);
  79. dstr_free(&stream->password);
  80. os_event_destroy(stream->stop_event);
  81. os_sem_destroy(stream->send_sem);
  82. pthread_mutex_destroy(&stream->packets_mutex);
  83. circlebuf_free(&stream->packets);
  84. bfree(stream);
  85. }
  86. }
  87. static void *rtmp_stream_create(obs_data_t settings, obs_output_t output)
  88. {
  89. struct rtmp_stream *stream = bzalloc(sizeof(struct rtmp_stream));
  90. stream->output = output;
  91. pthread_mutex_init_value(&stream->packets_mutex);
  92. RTMP_Init(&stream->rtmp);
  93. RTMP_LogSetCallback(log_rtmp);
  94. RTMP_LogSetLevel(RTMP_LOGWARNING);
  95. if (pthread_mutex_init(&stream->packets_mutex, NULL) != 0)
  96. goto fail;
  97. if (os_event_init(&stream->stop_event, OS_EVENT_TYPE_MANUAL) != 0)
  98. goto fail;
  99. UNUSED_PARAMETER(settings);
  100. return stream;
  101. fail:
  102. rtmp_stream_destroy(stream);
  103. return NULL;
  104. }
  105. static void rtmp_stream_stop(void *data)
  106. {
  107. struct rtmp_stream *stream = data;
  108. void *ret;
  109. #ifdef FILE_TEST
  110. fclose(stream->test);
  111. #endif
  112. os_event_signal(stream->stop_event);
  113. if (stream->connecting)
  114. pthread_join(stream->connect_thread, &ret);
  115. if (stream->active) {
  116. obs_output_end_data_capture(stream->output);
  117. os_sem_post(stream->send_sem);
  118. pthread_join(stream->send_thread, &ret);
  119. RTMP_Close(&stream->rtmp);
  120. }
  121. os_event_reset(stream->stop_event);
  122. }
  123. static inline void set_rtmp_str(AVal *val, const char *str)
  124. {
  125. bool valid = (str && *str);
  126. val->av_val = valid ? (char*)str : NULL;
  127. val->av_len = valid ? (int)strlen(str) : 0;
  128. }
  129. static inline void set_rtmp_dstr(AVal *val, struct dstr *str)
  130. {
  131. bool valid = !dstr_isempty(str);
  132. val->av_val = valid ? str->array : NULL;
  133. val->av_len = valid ? (int)str->len : 0;
  134. }
  135. static inline bool get_next_packet(struct rtmp_stream *stream,
  136. struct encoder_packet *packet)
  137. {
  138. bool new_packet = false;
  139. pthread_mutex_lock(&stream->packets_mutex);
  140. if (stream->packets.size) {
  141. circlebuf_pop_front(&stream->packets, packet,
  142. sizeof(struct encoder_packet));
  143. new_packet = true;
  144. }
  145. pthread_mutex_unlock(&stream->packets_mutex);
  146. return new_packet;
  147. }
  148. static int send_packet(struct rtmp_stream *stream,
  149. struct encoder_packet *packet, bool is_header)
  150. {
  151. uint8_t *data;
  152. size_t size;
  153. int ret = 0;
  154. flv_packet_mux(packet, &data, &size, is_header);
  155. #ifdef FILE_TEST
  156. fwrite(data, 1, size, stream->test);
  157. #else
  158. ret = RTMP_Write(&stream->rtmp, (char*)data, (int)size);
  159. #endif
  160. bfree(data);
  161. obs_free_encoder_packet(packet);
  162. return ret;
  163. }
  164. static bool send_remaining_packets(struct rtmp_stream *stream)
  165. {
  166. struct encoder_packet packet;
  167. while (get_next_packet(stream, &packet))
  168. if (send_packet(stream, &packet, false) < 0)
  169. return false;
  170. return true;
  171. }
  172. static void *send_thread(void *data)
  173. {
  174. struct rtmp_stream *stream = data;
  175. bool disconnected = false;
  176. while (os_sem_wait(stream->send_sem) == 0) {
  177. struct encoder_packet packet;
  178. if (os_event_try(stream->stop_event) != EAGAIN)
  179. break;
  180. if (!get_next_packet(stream, &packet))
  181. continue;
  182. if (send_packet(stream, &packet, false) < 0) {
  183. disconnected = true;
  184. break;
  185. }
  186. }
  187. if (!disconnected && !send_remaining_packets(stream))
  188. disconnected = true;
  189. if (disconnected) {
  190. blog(LOG_INFO, "Disconnected from %s", stream->path.array);
  191. free_packets(stream);
  192. }
  193. if (os_event_try(stream->stop_event) == EAGAIN) {
  194. pthread_detach(stream->send_thread);
  195. obs_output_signal_stop(stream->output, OBS_OUTPUT_DISCONNECTED);
  196. }
  197. stream->active = false;
  198. return NULL;
  199. }
  200. static void send_meta_data(struct rtmp_stream *stream)
  201. {
  202. uint8_t *meta_data;
  203. size_t meta_data_size;
  204. flv_meta_data(stream->output, &meta_data, &meta_data_size);
  205. #ifdef FILE_TEST
  206. fwrite(meta_data, 1, meta_data_size, stream->test);
  207. #else
  208. RTMP_Write(&stream->rtmp, (char*)meta_data, (int)meta_data_size);
  209. #endif
  210. bfree(meta_data);
  211. }
  212. static void send_audio_header(struct rtmp_stream *stream)
  213. {
  214. obs_output_t context = stream->output;
  215. obs_encoder_t aencoder = obs_output_get_audio_encoder(context);
  216. uint8_t *header;
  217. struct encoder_packet packet = {
  218. .type = OBS_ENCODER_AUDIO,
  219. .timebase_den = 1
  220. };
  221. obs_encoder_get_extra_data(aencoder, &header, &packet.size);
  222. packet.data = bmemdup(header, packet.size);
  223. send_packet(stream, &packet, true);
  224. }
  225. static void send_video_header(struct rtmp_stream *stream)
  226. {
  227. obs_output_t context = stream->output;
  228. obs_encoder_t vencoder = obs_output_get_video_encoder(context);
  229. uint8_t *header;
  230. size_t size;
  231. struct encoder_packet packet = {
  232. .type = OBS_ENCODER_VIDEO,
  233. .timebase_den = 1,
  234. .keyframe = true
  235. };
  236. obs_encoder_get_extra_data(vencoder, &header, &size);
  237. packet.size = obs_parse_avc_header(&packet.data, header, size);
  238. send_packet(stream, &packet, true);
  239. }
  240. static void send_headers(struct rtmp_stream *stream)
  241. {
  242. #ifdef FILE_TEST
  243. stream->test = os_fopen("D:\\bla.flv", "wb");
  244. #endif
  245. send_meta_data(stream);
  246. send_audio_header(stream);
  247. send_video_header(stream);
  248. }
  249. static inline bool reset_semaphore(struct rtmp_stream *stream)
  250. {
  251. os_sem_destroy(stream->send_sem);
  252. return os_sem_init(&stream->send_sem, 0) == 0;
  253. }
  254. #ifdef _WIN32
  255. #define socklen_t int
  256. #endif
  257. #define MIN_SENDBUF_SIZE 65535
  258. static void adjust_sndbuf_size(struct rtmp_stream *stream, int new_size)
  259. {
  260. int cur_sendbuf_size = new_size;
  261. socklen_t int_size = sizeof(int);
  262. #ifndef TEST_FRAMEDROPS
  263. getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF,
  264. (char*)&cur_sendbuf_size, &int_size);
  265. if (cur_sendbuf_size < new_size) {
  266. cur_sendbuf_size = new_size;
  267. #else
  268. {cur_sendbuf_size = 1024*8;
  269. #endif
  270. setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF,
  271. (const char*)&cur_sendbuf_size, int_size);
  272. }
  273. }
  274. static int init_send(struct rtmp_stream *stream)
  275. {
  276. int ret;
  277. #if defined(_WIN32) && !defined(FILE_TEST)
  278. adjust_sndbuf_size(stream, MIN_SENDBUF_SIZE);
  279. #endif
  280. reset_semaphore(stream);
  281. ret = pthread_create(&stream->send_thread, NULL, send_thread, stream);
  282. if (ret != 0) {
  283. RTMP_Close(&stream->rtmp);
  284. blog(LOG_ERROR, __FILE__": Failed to create send thread");
  285. return OBS_OUTPUT_ERROR;
  286. }
  287. stream->active = true;
  288. send_headers(stream);
  289. obs_output_begin_data_capture(stream->output, 0);
  290. return OBS_OUTPUT_SUCCESS;
  291. }
  292. static int try_connect(struct rtmp_stream *stream)
  293. {
  294. #ifndef FILE_TEST
  295. if (dstr_isempty(&stream->path)) {
  296. blog(LOG_WARNING, FILE_LINE "URL is empty");
  297. return OBS_OUTPUT_BAD_PATH;
  298. }
  299. if (dstr_isempty(&stream->key)) {
  300. blog(LOG_WARNING, FILE_LINE "Stream key is empty");
  301. return OBS_OUTPUT_BAD_PATH;
  302. }
  303. blog(LOG_INFO, "Connecting to RTMP URL %s...", stream->path.array);
  304. if (!RTMP_SetupURL2(&stream->rtmp, stream->path.array,
  305. stream->key.array))
  306. return OBS_OUTPUT_BAD_PATH;
  307. RTMP_EnableWrite(&stream->rtmp);
  308. set_rtmp_dstr(&stream->rtmp.Link.pubUser, &stream->username);
  309. set_rtmp_dstr(&stream->rtmp.Link.pubPasswd, &stream->password);
  310. stream->rtmp.Link.swfUrl = stream->rtmp.Link.tcUrl;
  311. set_rtmp_str(&stream->rtmp.Link.flashVer,
  312. "FMLE/3.0 (compatible; FMSc/1.0)");
  313. stream->rtmp.m_outChunkSize = 4096;
  314. stream->rtmp.m_bSendChunkSizeInfo = true;
  315. stream->rtmp.m_bUseNagle = true;
  316. if (!RTMP_Connect(&stream->rtmp, NULL))
  317. return OBS_OUTPUT_CONNECT_FAILED;
  318. if (!RTMP_ConnectStream(&stream->rtmp, 0))
  319. return OBS_OUTPUT_INVALID_STREAM;
  320. blog(LOG_INFO, "Connection to %s successful", stream->path.array);
  321. #endif
  322. return init_send(stream);
  323. }
  324. static void *connect_thread(void *data)
  325. {
  326. struct rtmp_stream *stream = data;
  327. int ret = try_connect(stream);
  328. if (ret != OBS_OUTPUT_SUCCESS) {
  329. obs_output_signal_stop(stream->output, ret);
  330. blog(LOG_INFO, "Connection to %s failed: %d",
  331. stream->path.array, ret);
  332. }
  333. if (os_event_try(stream->stop_event) == EAGAIN)
  334. pthread_detach(stream->connect_thread);
  335. stream->connecting = false;
  336. return NULL;
  337. }
  338. static bool rtmp_stream_start(void *data)
  339. {
  340. struct rtmp_stream *stream = data;
  341. obs_service_t service = obs_output_get_service(stream->output);
  342. obs_data_t settings;
  343. if (!obs_output_can_begin_data_capture(stream->output, 0))
  344. return false;
  345. if (!obs_output_initialize_encoders(stream->output, 0))
  346. return false;
  347. settings = obs_output_get_settings(stream->output);
  348. dstr_copy(&stream->path, obs_service_get_url(service));
  349. dstr_copy(&stream->key, obs_service_get_key(service));
  350. dstr_copy(&stream->username, obs_service_get_username(service));
  351. dstr_copy(&stream->password, obs_service_get_password(service));
  352. stream->drop_threshold_usec =
  353. (int64_t)obs_data_getint(settings, "drop_threshold");
  354. obs_data_release(settings);
  355. return pthread_create(&stream->connect_thread, NULL, connect_thread,
  356. stream) == 0;
  357. }
  358. static inline bool add_packet(struct rtmp_stream *stream,
  359. struct encoder_packet *packet)
  360. {
  361. circlebuf_push_back(&stream->packets, packet,
  362. sizeof(struct encoder_packet));
  363. stream->last_dts_usec = packet->dts_usec;
  364. return true;
  365. }
  366. static inline size_t num_buffered_packets(struct rtmp_stream *stream)
  367. {
  368. return stream->packets.size / sizeof(struct encoder_packet);
  369. }
  370. static void drop_frames(struct rtmp_stream *stream)
  371. {
  372. struct circlebuf new_buf = {0};
  373. int drop_priority = 0;
  374. uint64_t last_drop_dts_usec = 0;
  375. blog(LOG_DEBUG, "Previous packet count: %d",
  376. (int)num_buffered_packets(stream));
  377. circlebuf_reserve(&new_buf, sizeof(struct encoder_packet) * 8);
  378. while (stream->packets.size) {
  379. struct encoder_packet packet;
  380. circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
  381. last_drop_dts_usec = packet.dts_usec;
  382. if (packet.type == OBS_ENCODER_AUDIO) {
  383. circlebuf_push_back(&new_buf, &packet, sizeof(packet));
  384. } else {
  385. if (drop_priority < packet.drop_priority)
  386. drop_priority = packet.drop_priority;
  387. obs_free_encoder_packet(&packet);
  388. }
  389. }
  390. circlebuf_free(&stream->packets);
  391. stream->packets = new_buf;
  392. stream->min_priority = drop_priority;
  393. stream->min_drop_dts_usec = last_drop_dts_usec;
  394. blog(LOG_DEBUG, "New packet count: %d",
  395. (int)num_buffered_packets(stream));
  396. }
  397. static void check_to_drop_frames(struct rtmp_stream *stream)
  398. {
  399. struct encoder_packet first;
  400. int64_t buffer_duration_usec;
  401. if (num_buffered_packets(stream) < 5)
  402. return;
  403. circlebuf_peek_front(&stream->packets, &first, sizeof(first));
  404. /* do not drop frames if frames were just dropped within this time */
  405. if (first.dts_usec < stream->min_drop_dts_usec)
  406. return;
  407. /* if the amount of time stored in the buffered packets waiting to be
  408. * sent is higher than threshold, drop frames */
  409. buffer_duration_usec = stream->last_dts_usec - first.dts_usec;
  410. if (buffer_duration_usec > stream->drop_threshold_usec) {
  411. drop_frames(stream);
  412. blog(LOG_INFO, "dropping %" PRId64 " worth of frames",
  413. buffer_duration_usec);
  414. }
  415. }
  416. static bool add_video_packet(struct rtmp_stream *stream,
  417. struct encoder_packet *packet)
  418. {
  419. check_to_drop_frames(stream);
  420. /* if currently dropping frames, drop packets until it reaches the
  421. * desired priority */
  422. if (packet->priority < stream->min_priority)
  423. return false;
  424. else
  425. stream->min_priority = 0;
  426. return add_packet(stream, packet);
  427. }
  428. static void rtmp_stream_data(void *data, struct encoder_packet *packet)
  429. {
  430. struct rtmp_stream *stream = data;
  431. struct encoder_packet new_packet;
  432. bool added_packet;
  433. if (packet->type == OBS_ENCODER_VIDEO)
  434. obs_parse_avc_packet(&new_packet, packet);
  435. else
  436. obs_duplicate_encoder_packet(&new_packet, packet);
  437. pthread_mutex_lock(&stream->packets_mutex);
  438. added_packet = (packet->type == OBS_ENCODER_VIDEO) ?
  439. add_video_packet(stream, &new_packet) :
  440. add_packet(stream, &new_packet);
  441. pthread_mutex_unlock(&stream->packets_mutex);
  442. if (added_packet)
  443. os_sem_post(stream->send_sem);
  444. else
  445. obs_free_encoder_packet(&new_packet);
  446. }
  447. static void rtmp_stream_defaults(obs_data_t defaults)
  448. {
  449. obs_data_set_default_int(defaults, "drop_threshold", 600000);
  450. }
  451. static obs_properties_t rtmp_stream_properties(const char *locale)
  452. {
  453. obs_properties_t props = obs_properties_create(locale);
  454. /* TODO: locale */
  455. obs_properties_add_text(props, "path", "Stream URL", OBS_TEXT_DEFAULT);
  456. obs_properties_add_text(props, "key", "Stream Key", OBS_TEXT_PASSWORD);
  457. obs_properties_add_text(props, "username", "User Name",
  458. OBS_TEXT_DEFAULT);
  459. obs_properties_add_text(props, "password", "Password",
  460. OBS_TEXT_PASSWORD);
  461. return props;
  462. }
  463. struct obs_output_info rtmp_output_info = {
  464. .id = "rtmp_output",
  465. .flags = OBS_OUTPUT_AV |
  466. OBS_OUTPUT_ENCODED |
  467. OBS_OUTPUT_SERVICE,
  468. .getname = rtmp_stream_getname,
  469. .create = rtmp_stream_create,
  470. .destroy = rtmp_stream_destroy,
  471. .start = rtmp_stream_start,
  472. .stop = rtmp_stream_stop,
  473. .encoded_packet = rtmp_stream_data,
  474. .defaults = rtmp_stream_defaults,
  475. .properties = rtmp_stream_properties
  476. };