fec_manager.h 4.7 KB


  1. /*
  2. * fec_manager.h
  3. *
  4. * Created on: Sep 27, 2017
  5. * Author: root
  6. */
  7. #ifndef FEC_MANAGER_H_
  8. #define FEC_MANAGER_H_
  9. #include "common.h"
  10. #include "log.h"
  11. #include "lib/rs.h"
  12. const int max_blob_packet_num=30000;//how many packet can be contain in a blob_t ,can be set very large
  13. const u32_t anti_replay_buff_size=30000;//can be set very large
  14. const int max_fec_packet_num=255;// this is the limitation of the rs lib
  15. extern u32_t fec_buff_num;
  16. /*begin for first time init or dynamic update*/
  17. extern int g_fec_data_num;
  18. extern int g_fec_redundant_num;
  19. extern int g_fec_mtu;
  20. extern int g_fec_queue_len;
  21. extern int g_fec_timeout; //8ms
  22. extern int g_fec_mode;
  23. extern int dynamic_update_fec;
  24. /*end for first time init or dynamic update*/
  25. struct anti_replay_t
  26. {
  27. u64_t replay_buffer[anti_replay_buff_size];
  28. unordered_set<u32_t> st;
  29. int index;
  30. anti_replay_t()
  31. {
  32. clear();
  33. }
  34. int clear()
  35. {
  36. memset(replay_buffer,-1,sizeof(replay_buffer));
  37. st.clear();
  38. st.rehash(anti_replay_buff_size*3);
  39. index=0;
  40. return 0;
  41. }
  42. void set_invaild(u32_t seq)
  43. {
  44. if(st.find(seq)!=st.end() )
  45. {
  46. mylog(log_trace,"seq %u exist\n",seq);
  47. return;
  48. //return 0;
  49. }
  50. if(replay_buffer[index]!=u64_t(i64_t(-1)))
  51. {
  52. assert(st.find(replay_buffer[index])!=st.end());
  53. st.erase(replay_buffer[index]);
  54. }
  55. replay_buffer[index]=seq;
  56. st.insert(seq);
  57. index++;
  58. if(index==int(anti_replay_buff_size)) index=0;
  59. //return 1; //for complier check
  60. }
  61. int is_vaild(u32_t seq)
  62. {
  63. return st.find(seq)==st.end();
  64. }
  65. };
  66. struct blob_encode_t
  67. {
  68. char input_buf[(max_fec_packet_num+5)*buf_len];
  69. int current_len;
  70. int counter;
  71. char *output_buf[max_fec_packet_num+100];
  72. blob_encode_t();
  73. int clear();
  74. int get_num();
  75. int get_shard_len(int n);
  76. int get_shard_len(int n,int next_packet_len);
  77. int input(char *s,int len); //len=use len=0 for second and following packet
  78. int output(int n,char ** &s_arr,int & len);
  79. };
  80. struct blob_decode_t
  81. {
  82. char input_buf[(max_fec_packet_num+5)*buf_len];
  83. int current_len;
  84. int last_len;
  85. int counter;
  86. char *output_buf[max_blob_packet_num+100];
  87. int output_len[max_blob_packet_num+100];
  88. blob_decode_t();
  89. int clear();
  90. int input(char *input,int len);
  91. int output(int &n,char ** &output,int *&len_arr);
  92. };
  93. class fec_encode_manager_t
  94. {
  95. private:
  96. u32_t seq;
  97. int fec_mode;
  98. int fec_data_num,fec_redundant_num;
  99. int fec_mtu;
  100. int fec_queue_len;
  101. int fec_timeout;
  102. my_time_t first_packet_time;
  103. my_time_t first_packet_time_for_output;
  104. blob_encode_t blob_encode;
  105. char input_buf[max_fec_packet_num+5][buf_len];
  106. int input_len[max_fec_packet_num+100];
  107. char *output_buf[max_fec_packet_num+100];
  108. int output_len[max_fec_packet_num+100];
  109. int counter;
  110. int timer_fd;
  111. u64_t timer_fd64;
  112. int ready_for_output;
  113. u32_t output_n;
  114. int append(char *s,int len);
  115. public:
  116. fec_encode_manager_t();
  117. ~fec_encode_manager_t();
  118. int clear()
  119. {
  120. counter=0;
  121. blob_encode.clear();
  122. ready_for_output=0;
  123. itimerspec zero_its;
  124. memset(&zero_its, 0, sizeof(zero_its));
  125. timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
  126. seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug.
  127. return 0;
  128. }
  129. my_time_t get_first_packet_time()
  130. {
  131. return first_packet_time_for_output;
  132. }
  133. int get_pending_time()
  134. {
  135. return fec_timeout;
  136. }
  137. int get_type()
  138. {
  139. return fec_mode;
  140. }
  141. u64_t get_timer_fd64();
  142. int reset_fec_parameter(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type);
  143. int input(char *s,int len/*,int &is_first_packet*/);
  144. int output(int &n,char ** &s_arr,int *&len);
  145. };
  146. struct fec_data_t
  147. {
  148. int used;
  149. u32_t seq;
  150. int type;
  151. int data_num;
  152. int redundant_num;
  153. int idx;
  154. char buf[buf_len];
  155. int len;
  156. };
  157. struct fec_group_t
  158. {
  159. int type=-1;
  160. int data_num=-1;
  161. int redundant_num=-1;
  162. int len=-1;
  163. //int data_counter=0;
  164. map<int,int> group_mp;
  165. };
  166. class fec_decode_manager_t
  167. {
  168. anti_replay_t anti_replay;
  169. fec_data_t *fec_data;
  170. unordered_map<u32_t, fec_group_t> mp;
  171. blob_decode_t blob_decode;
  172. int index;
  173. int output_n;
  174. char ** output_s_arr;
  175. int * output_len_arr;
  176. int ready_for_output;
  177. char *output_s_arr_buf[max_fec_packet_num+100];//only for type=1,for type=0 the buf inside blot_t is used
  178. int output_len_arr_buf[max_fec_packet_num+100];//same
  179. public:
  180. fec_decode_manager_t()
  181. {
  182. fec_data=new fec_data_t[fec_buff_num+5];
  183. clear();
  184. }
  185. fec_decode_manager_t(const fec_decode_manager_t &b)
  186. {
  187. assert(0==1);//not allowed to copy
  188. }
  189. ~fec_decode_manager_t()
  190. {
  191. delete fec_data;
  192. }
  193. int clear()
  194. {
  195. anti_replay.clear();
  196. mp.clear();
  197. mp.rehash(fec_buff_num*3);
  198. for(int i=0;i<(int)fec_buff_num;i++)
  199. fec_data[i].used=0;
  200. ready_for_output=0;
  201. index=0;
  202. return 0;
  203. }
  204. //int re_init();
  205. int input(char *s,int len);
  206. int output(int &n,char ** &s_arr,int* &len_arr);
  207. };
  208. #endif /* FEC_MANAGER_H_ */