fec_manager.cpp 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. /*
  2. * fec_manager.cpp
  3. *
  4. * Created on: Sep 27, 2017
  5. * Author: root
  6. */
  7. #include "fec_manager.h"
  8. #include "log.h"
  9. #include "common.h"
  10. #include "lib/rs.h"
  11. #include "fd_manager.h"
  12. blob_encode_t::blob_encode_t()
  13. {
  14. clear();
  15. }
  16. int blob_encode_t::clear()
  17. {
  18. counter=0;
  19. current_len=(int)sizeof(u32_t);
  20. return 0;
  21. }
  22. int blob_encode_t::get_num()
  23. {
  24. return counter;
  25. }
  26. int blob_encode_t::get_shard_len(int n)
  27. {
  28. return round_up_div(current_len,n);
  29. }
  30. int blob_encode_t::get_shard_len(int n,int next_packet_len)
  31. {
  32. return round_up_div(current_len+(int)sizeof(u16_t)+next_packet_len,n);
  33. }
  34. int blob_encode_t::input(char *s,int len)
  35. {
  36. assert(current_len+len+sizeof(u16_t) <=max_fec_packet_num*buf_len);
  37. assert(len<=65535&&len>=0);
  38. counter++;
  39. assert(counter<=max_normal_packet_num);
  40. write_u16(buf+current_len,len);
  41. current_len+=sizeof(u16_t);
  42. memcpy(buf+current_len,s,len);
  43. current_len+=len;
  44. return 0;
  45. }
  46. int blob_encode_t::output(int n,char ** &s_arr,int & len)
  47. {
  48. len=round_up_div(current_len,n);
  49. write_u32(buf,counter);
  50. for(int i=0;i<n;i++)
  51. {
  52. output_arr[i]=buf+len*i;
  53. }
  54. s_arr=output_arr;
  55. return 0;
  56. }
  57. blob_decode_t::blob_decode_t()
  58. {
  59. clear();
  60. }
  61. int blob_decode_t::clear()
  62. {
  63. current_len=0;
  64. last_len=-1;
  65. counter=0;
  66. return 0;
  67. }
  68. int blob_decode_t::input(char *s,int len)
  69. {
  70. if(last_len!=-1)
  71. {
  72. assert(last_len==len);
  73. }
  74. counter++;
  75. assert(counter<=max_fec_packet_num);
  76. last_len=len;
  77. assert(current_len+len+100<(int)sizeof(buf));
  78. memcpy(buf+current_len,s,len);
  79. current_len+=len;
  80. return 0;
  81. }
  82. int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr)
  83. {
  84. int parser_pos=0;
  85. if(parser_pos+(int)sizeof(u32_t)>current_len) return -1;
  86. n=(int)read_u32(buf+parser_pos);
  87. if(n>max_normal_packet_num) {mylog(log_info,"failed 1\n");return -1;}
  88. s_arr=s_buf;
  89. len_arr=len_buf;
  90. parser_pos+=sizeof(u32_t);
  91. for(int i=0;i<n;i++)
  92. {
  93. if(parser_pos+(int)sizeof(u16_t)>current_len) {mylog(log_info,"failed2 \n");return -1;}
  94. len_arr[i]=(int)read_u16(buf+parser_pos);
  95. parser_pos+=(int)sizeof(u16_t);
  96. if(parser_pos+len_arr[i]>current_len) {mylog(log_info,"failed 3 %d %d %d\n",parser_pos,len_arr[i],current_len);return -1;}
  97. s_arr[i]=buf+parser_pos;
  98. parser_pos+=len_arr[i];
  99. }
  100. return 0;
  101. }
  102. fec_encode_manager_t::fec_encode_manager_t()
  103. {
  104. //int timer_fd;
  105. if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
  106. {
  107. mylog(log_fatal,"timer_fd create error");
  108. myexit(1);
  109. }
  110. timer_fd64=fd_manager.create(timer_fd);
  111. re_init(4,2,1200,100,10000);
  112. }
  113. fec_encode_manager_t::~fec_encode_manager_t()
  114. {
  115. fd_manager.fd64_close(timer_fd64);
  116. }
  117. u64_t fec_encode_manager_t::get_timer_fd64()
  118. {
  119. return timer_fd64;
  120. }
  121. int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time)
  122. {
  123. fec_data_num=data_num;
  124. fec_redundant_num=redundant_num;
  125. fec_mtu=mtu;
  126. fec_pending_num=pending_num;
  127. fec_pending_time=pending_time;
  128. counter=0;
  129. blob_encode.clear();
  130. ready_for_output=0;
  131. seq=0;
  132. itimerspec zero_its;
  133. memset(&zero_its, 0, sizeof(zero_its));
  134. timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
  135. return 0;
  136. }
  137. int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
  138. {
  139. if(s==0 ||blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu||counter>=fec_pending_num)
  140. {
  141. char ** blob_output;
  142. int blob_len;
  143. mylog(log_debug,"counter=%d\n",counter);
  144. if(counter==0)
  145. {
  146. if(s==0) return 0;//relax this restriction temporarily
  147. else mylog(log_warn,"message too long,ignored\n");
  148. }
  149. blob_encode.output(fec_data_num,blob_output,blob_len);
  150. for(int i=0;i<fec_data_num+fec_redundant_num;i++)
  151. {
  152. int tmp_idx=0;
  153. write_u32(buf[i]+tmp_idx,seq);
  154. tmp_idx+=sizeof(u32_t);
  155. buf[i][tmp_idx++]=(unsigned char)0;
  156. buf[i][tmp_idx++]=(unsigned char)fec_data_num;
  157. buf[i][tmp_idx++]=(unsigned char)fec_redundant_num;
  158. buf[i][tmp_idx++]=(unsigned char)i;
  159. output_buf[i]=buf[i]+tmp_idx; //////caution ,trick here.
  160. output_len[i]=tmp_idx+blob_len;
  161. if(i<fec_data_num)
  162. {
  163. memcpy(buf[i]+tmp_idx,blob_output[i],blob_len);
  164. tmp_idx+=blob_len;
  165. }
  166. }
  167. //output_len=blob_len+sizeof(u32_t)+4*sizeof(char);/////remember to change this 4,if modified the protocol
  168. rs_encode2(fec_data_num,fec_data_num+fec_redundant_num,output_buf,blob_len);
  169. for(int i=0;i<fec_data_num+fec_redundant_num;i++)
  170. {
  171. output_buf[i]=buf[i];//////caution ,trick here.
  172. }
  173. ready_for_output=1;
  174. seq++;
  175. counter=0;
  176. blob_encode.clear();
  177. }
  178. if(s!=0)
  179. {
  180. if(counter==0)
  181. {
  182. itimerspec its;
  183. memset(&its.it_interval,0,sizeof(its.it_interval));
  184. my_time_t tmp_time=fec_pending_time+get_current_time_us();
  185. its.it_value.tv_sec=tmp_time/1000000llu;
  186. its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu;
  187. timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
  188. }
  189. blob_encode.input(s,len);
  190. counter++;
  191. }
  192. return 0;
  193. }
  194. int fec_encode_manager_t::output(int &n,char ** &s_arr,int *&len)
  195. {
  196. if(!ready_for_output)
  197. {
  198. n=-1;
  199. len=0;
  200. s_arr=0;
  201. }
  202. else
  203. {
  204. n=fec_data_num+fec_redundant_num;
  205. len=output_len;
  206. s_arr=output_buf;
  207. ready_for_output=0;
  208. }
  209. return 0;
  210. }
  211. fec_decode_manager_t::fec_decode_manager_t()
  212. {
  213. re_init();
  214. }
  215. int fec_decode_manager_t::re_init()
  216. {
  217. for(int i=0;i<(int)fec_buff_size;i++)
  218. fec_data[i].used=0;
  219. ready_for_output=0;
  220. return 0;
  221. }
  222. int fec_decode_manager_t::input(char *s,int len)
  223. {
  224. assert(s!=0);
  225. int tmp_idx=0;
  226. u32_t seq=read_u32(s+tmp_idx);
  227. tmp_idx+=sizeof(u32_t);
  228. int type=(unsigned char)s[tmp_idx++];
  229. int data_num=(unsigned char)s[tmp_idx++];
  230. int redundant_num=(unsigned char)s[tmp_idx++];
  231. int inner_index=(unsigned char)s[tmp_idx++];
  232. len=len-tmp_idx;
  233. if(len<0)
  234. {
  235. return -1;
  236. }
  237. if(data_num+redundant_num>max_fec_packet_num)
  238. {
  239. return -1;
  240. }
  241. if(!anti_replay.is_vaild(seq))
  242. {
  243. return 0;
  244. }
  245. if(!mp[seq].empty())
  246. {
  247. int first_idx=mp[seq].begin()->second;
  248. int ok=1;
  249. if(fec_data[first_idx].data_num!=data_num)
  250. ok=0;
  251. if(fec_data[first_idx].redundant_num!=redundant_num)
  252. ok=0;
  253. if(fec_data[first_idx].len!=len)
  254. ok=0;
  255. if(ok==0)
  256. {
  257. return 0;
  258. }
  259. }
  260. if(fec_data[index].used!=0)
  261. {
  262. int tmp_seq=fec_data[index].seq;
  263. anti_replay.set_invaild(tmp_seq);
  264. if(mp.find(tmp_seq)!=mp.end())
  265. {
  266. mp.erase(tmp_seq);
  267. }
  268. }
  269. fec_data[index].used=1;
  270. fec_data[index].seq=seq;
  271. fec_data[index].type=type;
  272. fec_data[index].data_num=data_num;
  273. fec_data[index].redundant_num=redundant_num;
  274. fec_data[index].idx=inner_index;
  275. fec_data[index].len=len;
  276. memcpy(fec_data[index].buf,s+tmp_idx,len);
  277. mp[seq][inner_index]=index;
  278. index++;
  279. if(index==int(anti_replay_buff_size)) index=0;
  280. map<int,int> &inner_mp=mp[seq];
  281. assert((int)inner_mp.size()<=data_num);
  282. if((int)inner_mp.size()==data_num)
  283. {
  284. char *fec_tmp_arr[max_fec_packet_num+5]={0};
  285. for(auto it=inner_mp.begin();it!=inner_mp.end();it++)
  286. {
  287. fec_tmp_arr[it->first]=fec_data[it->second].buf;
  288. }
  289. rs_decode2(data_num,data_num+redundant_num,fec_tmp_arr,len); //the input data has been modified in-place
  290. blob_decode.clear();
  291. for(int i=0;i<data_num;i++)
  292. {
  293. blob_decode.input(fec_tmp_arr[i],len);
  294. }
  295. blob_decode.output(output_n,output_s_arr,output_len_arr);
  296. ready_for_output=1;
  297. anti_replay.set_invaild(seq);
  298. }
  299. return 0;
  300. }
  301. int fec_decode_manager_t::output(int &n,char ** &s_arr,int* &len_arr)
  302. {
  303. if(!ready_for_output)
  304. {
  305. n=-1;
  306. s_arr=0;
  307. len_arr=0;
  308. }
  309. else
  310. {
  311. ready_for_output=0;
  312. n=output_n;
  313. s_arr=output_s_arr;
  314. len_arr=output_len_arr;
  315. }
  316. return 0;
  317. }