fec_manager.h 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. * fec_manager.h
  3. *
  4. * Created on: Sep 27, 2017
  5. * Author: root
  6. */
  7. #ifndef FEC_MANAGER_H_
  8. #define FEC_MANAGER_H_
  9. #include "common.h"
  10. #include "log.h"
  11. #include "lib/rs.h"
  12. const int max_blob_packet_num=30000;//how many packet can be contain in a blob_t ,can be set very large
  13. const u32_t anti_replay_buff_size=30000;//can be set very large
  14. const int max_fec_packet_num=255;// this is the limitation of the rs lib
  15. extern u32_t fec_buff_num;
  16. /*begin for first time init or dynamic update*/
  17. extern int g_fec_data_num;
  18. extern int g_fec_redundant_num;
  19. extern int g_fec_mtu;
  20. extern int g_fec_queue_len;
  21. extern int g_fec_timeout; //8ms
  22. extern int g_fec_mode;
  23. extern int dynamic_update_fec;
  24. /*end for first time init or dynamic update*/
  25. struct anti_replay_t
  26. {
  27. u64_t replay_buffer[anti_replay_buff_size];
  28. unordered_set<u32_t> st;
  29. int index;
  30. anti_replay_t()
  31. {
  32. clear();
  33. }
  34. int clear()
  35. {
  36. memset(replay_buffer,-1,sizeof(replay_buffer));
  37. st.clear();
  38. st.rehash(anti_replay_buff_size*3);
  39. index=0;
  40. return 0;
  41. }
  42. void set_invaild(u32_t seq)
  43. {
  44. if(st.find(seq)!=st.end() )
  45. {
  46. mylog(log_trace,"seq %u exist\n",seq);
  47. return;
  48. //return 0;
  49. }
  50. if(replay_buffer[index]!=u64_t(i64_t(-1)))
  51. {
  52. assert(st.find(replay_buffer[index])!=st.end());
  53. st.erase(replay_buffer[index]);
  54. }
  55. replay_buffer[index]=seq;
  56. st.insert(seq);
  57. index++;
  58. if(index==int(anti_replay_buff_size)) index=0;
  59. //return 1; //for complier check
  60. }
  61. int is_vaild(u32_t seq)
  62. {
  63. return st.find(seq)==st.end();
  64. }
  65. };
  66. struct blob_encode_t
  67. {
  68. char input_buf[(max_fec_packet_num+5)*buf_len];
  69. int current_len;
  70. int counter;
  71. char *output_buf[max_fec_packet_num+100];
  72. blob_encode_t();
  73. int clear();
  74. int get_num();
  75. int get_shard_len(int n);
  76. int get_shard_len(int n,int next_packet_len);
  77. int input(char *s,int len); //len=use len=0 for second and following packet
  78. int output(int n,char ** &s_arr,int & len);
  79. };
  80. struct blob_decode_t
  81. {
  82. char input_buf[(max_fec_packet_num+5)*buf_len];
  83. int current_len;
  84. int last_len;
  85. int counter;
  86. char *output_buf[max_blob_packet_num+100];
  87. int output_len[max_blob_packet_num+100];
  88. blob_decode_t();
  89. int clear();
  90. int input(char *input,int len);
  91. int output(int &n,char ** &output,int *&len_arr);
  92. };
  93. class fec_encode_manager_t
  94. {
  95. private:
  96. u32_t seq;
  97. int fec_mode;
  98. int fec_data_num,fec_redundant_num;
  99. int fec_mtu;
  100. int fec_queue_len;
  101. int fec_timeout;
  102. my_time_t first_packet_time;
  103. my_time_t first_packet_time_for_output;
  104. blob_encode_t blob_encode;
  105. char input_buf[max_fec_packet_num+5][buf_len];
  106. int input_len[max_fec_packet_num+100];
  107. char *output_buf[max_fec_packet_num+100];
  108. int output_len[max_fec_packet_num+100];
  109. int counter;
  110. //int timer_fd;
  111. //u64_t timer_fd64;
  112. int ready_for_output;
  113. u32_t output_n;
  114. int append(char *s,int len);
  115. ev_timer timer;
  116. struct ev_loop *loop=0;
  117. void (*cb) (struct ev_loop *loop, struct ev_timer *watcher, int revents)=0;
  118. public:
  119. fec_encode_manager_t();
  120. ~fec_encode_manager_t();
  121. void set_data(void * data)
  122. {
  123. timer.data=data;
  124. }
  125. void set_loop_and_cb(struct ev_loop *loop,void (*cb) (struct ev_loop *loop, struct ev_timer *watcher, int revents))
  126. {
  127. this->loop=loop;
  128. this->cb=cb;
  129. ev_init(&timer,cb);
  130. }
  131. int clear()
  132. {
  133. counter=0;
  134. blob_encode.clear();
  135. ready_for_output=0;
  136. //itimerspec zero_its;
  137. //memset(&zero_its, 0, sizeof(zero_its));
  138. //timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
  139. if(loop)
  140. {
  141. ev_timer_stop(loop,&timer);
  142. loop=0;
  143. cb=0;
  144. }
  145. seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug.
  146. return 0;
  147. }
  148. my_time_t get_first_packet_time()
  149. {
  150. return first_packet_time_for_output;
  151. }
  152. int get_pending_time()
  153. {
  154. return fec_timeout;
  155. }
  156. int get_type()
  157. {
  158. return fec_mode;
  159. }
  160. //u64_t get_timer_fd64();
  161. int reset_fec_parameter(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type);
  162. int input(char *s,int len/*,int &is_first_packet*/);
  163. int output(int &n,char ** &s_arr,int *&len);
  164. };
  165. struct fec_data_t
  166. {
  167. int used;
  168. u32_t seq;
  169. int type;
  170. int data_num;
  171. int redundant_num;
  172. int idx;
  173. char buf[buf_len];
  174. int len;
  175. };
  176. struct fec_group_t
  177. {
  178. int type=-1;
  179. int data_num=-1;
  180. int redundant_num=-1;
  181. int len=-1;
  182. //int data_counter=0;
  183. map<int,int> group_mp;
  184. };
  185. class fec_decode_manager_t
  186. {
  187. anti_replay_t anti_replay;
  188. fec_data_t *fec_data;
  189. unordered_map<u32_t, fec_group_t> mp;
  190. blob_decode_t blob_decode;
  191. int index;
  192. int output_n;
  193. char ** output_s_arr;
  194. int * output_len_arr;
  195. int ready_for_output;
  196. char *output_s_arr_buf[max_fec_packet_num+100];//only for type=1,for type=0 the buf inside blot_t is used
  197. int output_len_arr_buf[max_fec_packet_num+100];//same
  198. public:
  199. fec_decode_manager_t()
  200. {
  201. fec_data=new fec_data_t[fec_buff_num+5];
  202. clear();
  203. }
  204. fec_decode_manager_t(const fec_decode_manager_t &b)
  205. {
  206. assert(0==1);//not allowed to copy
  207. }
  208. ~fec_decode_manager_t()
  209. {
  210. delete fec_data;
  211. }
  212. int clear()
  213. {
  214. anti_replay.clear();
  215. mp.clear();
  216. mp.rehash(fec_buff_num*3);
  217. for(int i=0;i<(int)fec_buff_num;i++)
  218. fec_data[i].used=0;
  219. ready_for_output=0;
  220. index=0;
  221. return 0;
  222. }
  223. //int re_init();
  224. int input(char *s,int len);
  225. int output(int &n,char ** &s_arr,int* &len_arr);
  226. };
  227. #endif /* FEC_MANAGER_H_ */