fec_manager.h 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. /*
  2. * fec_manager.h
  3. *
  4. * Created on: Sep 27, 2017
  5. * Author: root
  6. */
  7. #ifndef FEC_MANAGER_H_
  8. #define FEC_MANAGER_H_
  9. #include "common.h"
  10. #include "log.h"
  11. #include "lib/rs.h"
  12. const int max_packet_num=1000;
  13. const u32_t anti_replay_buff_size=30000;
  14. const u32_t fec_buff_size=3000;
  15. struct anti_replay_t
  16. {
  17. u64_t replay_buffer[anti_replay_buff_size];
  18. unordered_set<u32_t> st;
  19. int index;
  20. anti_replay_t()
  21. {
  22. memset(replay_buffer,-1,sizeof(replay_buffer));
  23. st.rehash(anti_replay_buff_size*10);
  24. index=0;
  25. }
  26. void set_invaild(u32_t seq)
  27. {
  28. if(st.find(seq)!=st.end() )
  29. {
  30. mylog(log_trace,"seq %llx exist\n",seq);
  31. return;
  32. //return 0;
  33. }
  34. if(replay_buffer[index]!=u64_t(i64_t(-1)))
  35. {
  36. assert(st.find(replay_buffer[index])!=st.end());
  37. st.erase(replay_buffer[index]);
  38. }
  39. replay_buffer[index]=seq;
  40. st.insert(seq);
  41. index++;
  42. if(index==int(anti_replay_buff_size)) index=0;
  43. //return 1; //for complier check
  44. }
  45. int is_vaild(u32_t seq)
  46. {
  47. return st.find(seq)==st.end();
  48. }
  49. };
  50. struct blob_encode_t
  51. {
  52. char buf[(256+5)*buf_len];
  53. int current_len;
  54. int counter;
  55. blob_encode_t();
  56. int clear();
  57. int get_num();
  58. int get_shard_len(int n);
  59. int get_shard_len(int n,int next_packet_len);
  60. int input(char *s,int len); //len=use len=0 for second and following packet
  61. int output(int n,char ** &s_arr,int & len);
  62. };
  63. struct blob_decode_t
  64. {
  65. char buf[(256+5)*buf_len];
  66. int current_len;
  67. int last_len;
  68. int counter;
  69. blob_decode_t();
  70. int clear();
  71. int input(char *input,int len);
  72. int output(int &n,char ** &output,int *&len_arr);
  73. };
  74. struct fec_encode_manager_t
  75. {
  76. int fec_data_num,fec_redundant_num;
  77. int fec_mtu;
  78. char buf[256+5][buf_len+100];
  79. char *output_buf[256+5];
  80. int output_len;
  81. int ready_for_output;
  82. int counter;
  83. blob_encode_t blob_encode;
  84. fec_encode_manager_t();
  85. int re_init();
  86. int input(char *s,int len,int &is_first_packet);
  87. int output(int &n,char ** &s_arr,int &len);
  88. };
  89. struct fec_data_t
  90. {
  91. int used;
  92. u32_t seq;
  93. int data_num;
  94. int redundant_num;
  95. int idx;
  96. int type;
  97. char buf[buf_len];
  98. int len;
  99. };
  100. struct fec_decode_manager_t
  101. {
  102. anti_replay_t anti_replay;
  103. fec_data_t fec_data[fec_buff_size];
  104. int index;
  105. unordered_map<u32_t, map<int,int> > mp;
  106. blob_decode_t blob_decode;
  107. fec_decode_manager_t()
  108. {
  109. for(int i=0;i<(int)fec_buff_size;i++)
  110. fec_data[i].used=0;
  111. ready_for_output=0;
  112. }
  113. int output_n;
  114. char ** output_s_arr;
  115. int * output_len_arr;
  116. int ready_for_output;
  117. int input(char *s,int len)
  118. {
  119. char *ori_s=s;
  120. u32_t seq=read_u32(s);
  121. s+=sizeof(u32_t);
  122. int data_num=(unsigned char)*(s++);
  123. int redundant_num=(unsigned char)*(s++);
  124. int innder_index=(unsigned char)*(s++);
  125. int type=(unsigned char)*(s++);
  126. len=len-int(s-ori_s);
  127. if(len<0)
  128. {
  129. return -1;
  130. }
  131. if(!anti_replay.is_vaild(seq))
  132. {
  133. return 0;
  134. }
  135. if(!mp[seq].empty())
  136. {
  137. int tmp_idx=mp[seq].begin()->second;
  138. int ok=1;
  139. if(data_num+redundant_num>255)
  140. ok=0;
  141. if(fec_data[tmp_idx].data_num!=data_num||fec_data[tmp_idx].redundant_num!=redundant_num||fec_data[tmp_idx].len!=len)
  142. {
  143. ok=0;
  144. }
  145. if(ok==0)
  146. {
  147. return 0;
  148. }
  149. }
  150. if(fec_data[index].used!=0)
  151. {
  152. int tmp_seq=fec_data[index].seq;
  153. anti_replay.set_invaild(tmp_seq);
  154. if(mp.find(tmp_seq)!=mp.end())
  155. {
  156. mp.erase(tmp_seq);
  157. }
  158. }
  159. fec_data[index].used=1;
  160. fec_data[index].seq=seq;
  161. fec_data[index].data_num=data_num;
  162. fec_data[index].redundant_num=redundant_num;
  163. fec_data[index].idx=innder_index;
  164. fec_data[index].type=type;
  165. fec_data[index].len=len;
  166. mp[seq][innder_index]=index;
  167. map<int,int> &inner_mp=mp[seq];
  168. if((int)inner_mp.size()>=data_num)
  169. {
  170. anti_replay.set_invaild(seq);
  171. char *fec_tmp_arr[256+5]={0};
  172. for(auto it=inner_mp.begin();it!=inner_mp.end();it++)
  173. {
  174. fec_tmp_arr[it->first]=fec_data[it->second].buf;
  175. }
  176. rs_decode2(data_num,redundant_num,fec_tmp_arr,len);
  177. blob_decode.clear();
  178. for(int i=0;i<data_num;i++)
  179. {
  180. blob_decode.input(fec_tmp_arr[i],len);
  181. }
  182. blob_decode.output(output_n,output_s_arr,output_len_arr);
  183. ready_for_output=1;
  184. }
  185. index++;
  186. if(index==int(anti_replay_buff_size)) index=0;
  187. return 0;
  188. }
  189. int output(int &n,char ** &s_arr,int* &len_arr)
  190. {
  191. if(!ready_for_output)
  192. {
  193. n=-1;
  194. s_arr=0;
  195. len_arr=0;
  196. }
  197. else
  198. {
  199. ready_for_output=0;
  200. n=output_n;
  201. s_arr=output_s_arr;
  202. len_arr=output_len_arr;
  203. }
  204. return 0;
  205. }
  206. };
  207. #endif /* FEC_MANAGER_H_ */