fec_manager.cpp 16 KB


  1. /*
  2. * fec_manager.cpp
  3. *
  4. * Created on: Sep 27, 2017
  5. * Author: root
  6. */
  7. #include "fec_manager.h"
  8. #include "log.h"
  9. #include "common.h"
  10. #include "lib/rs.h"
  11. #include "fd_manager.h"
  12. const int encode_fast_send=1;
  13. const int decode_fast_send=1;
  14. blob_encode_t::blob_encode_t()
  15. {
  16. clear();
  17. }
  18. int blob_encode_t::clear()
  19. {
  20. counter=0;
  21. current_len=(int)sizeof(u32_t);
  22. return 0;
  23. }
  24. int blob_encode_t::get_num()
  25. {
  26. return counter;
  27. }
  28. int blob_encode_t::get_shard_len(int n)
  29. {
  30. return round_up_div(current_len,n);
  31. }
  32. int blob_encode_t::get_shard_len(int n,int next_packet_len)
  33. {
  34. return round_up_div(current_len+(int)sizeof(u16_t)+next_packet_len,n);
  35. }
  36. int blob_encode_t::input(char *s,int len)
  37. {
  38. assert(current_len+len+sizeof(u16_t) <=max_fec_packet_num*buf_len);
  39. assert(len<=65535&&len>=0);
  40. counter++;
  41. assert(counter<=max_normal_packet_num);
  42. write_u16(buf+current_len,len);
  43. current_len+=sizeof(u16_t);
  44. memcpy(buf+current_len,s,len);
  45. current_len+=len;
  46. return 0;
  47. }
  48. int blob_encode_t::output(int n,char ** &s_arr,int & len)
  49. {
  50. len=round_up_div(current_len,n);
  51. write_u32(buf,counter);
  52. for(int i=0;i<n;i++)
  53. {
  54. output_arr[i]=buf+len*i;
  55. }
  56. s_arr=output_arr;
  57. return 0;
  58. }
  59. blob_decode_t::blob_decode_t()
  60. {
  61. clear();
  62. }
  63. int blob_decode_t::clear()
  64. {
  65. current_len=0;
  66. last_len=-1;
  67. counter=0;
  68. return 0;
  69. }
  70. int blob_decode_t::input(char *s,int len)
  71. {
  72. if(last_len!=-1)
  73. {
  74. assert(last_len==len);
  75. }
  76. counter++;
  77. assert(counter<=max_fec_packet_num);
  78. last_len=len;
  79. assert(current_len+len+100<(int)sizeof(buf));
  80. memcpy(buf+current_len,s,len);
  81. current_len+=len;
  82. return 0;
  83. }
  84. int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr)
  85. {
  86. int parser_pos=0;
  87. if(parser_pos+(int)sizeof(u32_t)>current_len) return -1;
  88. n=(int)read_u32(buf+parser_pos);
  89. if(n>max_normal_packet_num) {mylog(log_info,"failed 1\n");return -1;}
  90. s_arr=s_buf;
  91. len_arr=len_buf;
  92. parser_pos+=sizeof(u32_t);
  93. for(int i=0;i<n;i++)
  94. {
  95. if(parser_pos+(int)sizeof(u16_t)>current_len) {mylog(log_info,"failed2 \n");return -1;}
  96. len_arr[i]=(int)read_u16(buf+parser_pos);
  97. parser_pos+=(int)sizeof(u16_t);
  98. if(parser_pos+len_arr[i]>current_len) {mylog(log_info,"failed 3 %d %d %d\n",parser_pos,len_arr[i],current_len);return -1;}
  99. s_arr[i]=buf+parser_pos;
  100. parser_pos+=len_arr[i];
  101. }
  102. return 0;
  103. }
  104. fec_encode_manager_t::fec_encode_manager_t()
  105. {
  106. //int timer_fd;
  107. if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
  108. {
  109. mylog(log_fatal,"timer_fd create error");
  110. myexit(1);
  111. }
  112. timer_fd64=fd_manager.create(timer_fd);
  113. re_init(4,2,1200,100,10000,0);
  114. }
  115. fec_encode_manager_t::~fec_encode_manager_t()
  116. {
  117. fd_manager.fd64_close(timer_fd64);
  118. }
  119. u64_t fec_encode_manager_t::get_timer_fd64()
  120. {
  121. return timer_fd64;
  122. }
  123. int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type)
  124. {
  125. fec_data_num=data_num;
  126. fec_redundant_num=redundant_num;
  127. fec_mtu=mtu;
  128. fec_pending_num=pending_num;
  129. fec_pending_time=pending_time;
  130. this->type=type;
  131. counter=0;
  132. blob_encode.clear();
  133. ready_for_output=0;
  134. seq=0;
  135. itimerspec zero_its;
  136. memset(&zero_its, 0, sizeof(zero_its));
  137. timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
  138. return 0;
  139. }
  140. int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/)
  141. {
  142. if(counter==0)
  143. {
  144. itimerspec its;
  145. memset(&its.it_interval,0,sizeof(its.it_interval));
  146. my_time_t tmp_time=fec_pending_time+get_current_time_us();
  147. its.it_value.tv_sec=tmp_time/1000000llu;
  148. its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu;
  149. timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
  150. }
  151. if(type==0)
  152. {
  153. blob_encode.input(s,len);
  154. }
  155. else if(type==1)
  156. {
  157. mylog(log_info,"counter=%d\n",counter);
  158. assert(len<=65535&&len>=0);
  159. char * p=buf[counter]+sizeof(u32_t)+4*sizeof(char);
  160. write_u16(p,(u16_t)((u32_t)len));
  161. p+=sizeof(u16_t);
  162. memcpy(p,s,len);//remember to change this,if protocol is modified
  163. buf_s_len[counter]=len+sizeof(u16_t);
  164. }
  165. else
  166. {
  167. assert(0==1);
  168. }
  169. counter++;
  170. return 0;
  171. }
  172. int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
  173. {
  174. int about_to_fec=0;
  175. int delayed_append=0;
  176. //int counter_back=counter;
  177. if(type==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu)
  178. {
  179. mylog(log_warn,"message too long len=%d,ignored\n",len);
  180. return -1;
  181. }
  182. if(type==1&&s!=0&&len>=fec_mtu)
  183. {
  184. mylog(log_warn,"message too long len=%d,ignored\n",len);
  185. return -1;
  186. }
  187. if(s==0) about_to_fec=1;//now
  188. assert(type==0||type==1);
  189. if(type==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet
  190. if(type==0) assert(counter<fec_pending_num);
  191. if(type==1) assert(counter<fec_data_num);
  192. if(s!=0&&!delayed_append)
  193. {
  194. append(s,len);
  195. }
  196. if(type==0&& counter==fec_pending_num) {about_to_fec=1;} //
  197. if(type==1&& counter==fec_data_num) about_to_fec=1;
  198. if(about_to_fec)
  199. {
  200. char ** blob_output;
  201. int fec_len=-1;
  202. mylog(log_debug,"counter=%d\n",counter);
  203. if(counter==0)
  204. {
  205. mylog(log_warn,"unexpected counter==0\n");
  206. return -1;
  207. }
  208. int actual_data_num;
  209. int actual_redundant_num;
  210. if(type==0)
  211. {
  212. actual_data_num=fec_data_num;
  213. actual_redundant_num=fec_redundant_num;
  214. blob_encode.output(actual_data_num,blob_output,fec_len);
  215. }
  216. else
  217. {
  218. actual_data_num=counter;
  219. actual_redundant_num=fec_redundant_num;
  220. for(int i=0;i<counter;i++)
  221. {
  222. assert(buf_s_len[i]>=0);
  223. if(buf_s_len[i]>fec_len) fec_len=buf_s_len[i];
  224. }
  225. }
  226. mylog(log_trace,"%d %d %d\n",actual_data_num,actual_redundant_num,fec_len);
  227. char *tmp_output_buf[max_fec_packet_num+5]={0};
  228. for(int i=0;i<actual_data_num+actual_redundant_num;i++)
  229. {
  230. int tmp_idx=0;
  231. write_u32(buf[i] + tmp_idx, seq);
  232. tmp_idx += sizeof(u32_t);
  233. buf[i][tmp_idx++] = (unsigned char) type;
  234. if (type == 1 && i < actual_data_num)
  235. {
  236. buf[i][tmp_idx++] = (unsigned char) 0;
  237. buf[i][tmp_idx++] = (unsigned char) 0;
  238. } else
  239. {
  240. buf[i][tmp_idx++] = (unsigned char) actual_data_num;
  241. buf[i][tmp_idx++] = (unsigned char) actual_redundant_num;
  242. }
  243. buf[i][tmp_idx++] = (unsigned char) i;
  244. tmp_output_buf[i]=buf[i]+tmp_idx; //////caution ,trick here.
  245. if(type==0)
  246. {
  247. output_len[i]=tmp_idx+fec_len;
  248. if(i<actual_data_num)
  249. {
  250. memcpy(buf[i]+tmp_idx,blob_output[i],fec_len);
  251. }
  252. }
  253. else
  254. {
  255. if(i<actual_data_num)
  256. {
  257. output_len[i]=tmp_idx+buf_s_len[i];
  258. memset(tmp_output_buf[i]+buf_s_len[i],0,fec_len-buf_s_len[i]);
  259. }
  260. else
  261. output_len[i]=tmp_idx+fec_len;
  262. }
  263. output_buf[i]=buf[i];
  264. }
  265. //output_len=blob_len+sizeof(u32_t)+4*sizeof(char);/////remember to change this 4,if modified the protocol
  266. rs_encode2(actual_data_num,actual_data_num+actual_redundant_num,tmp_output_buf,fec_len);
  267. mylog(log_trace,"!!! s= %d\n");
  268. ready_for_output=1;
  269. seq++;
  270. counter=0;
  271. output_n=actual_data_num+actual_redundant_num;
  272. blob_encode.clear();
  273. itimerspec its;
  274. memset(&its,0,sizeof(its));
  275. timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
  276. if(encode_fast_send&&type==1)
  277. {
  278. int packet_to_send[max_fec_packet_num+5]={0};
  279. int packet_to_send_counter=0;
  280. //assert(counter!=0);
  281. if(s!=0)
  282. packet_to_send[packet_to_send_counter++]=actual_data_num-1;
  283. for(int i=actual_data_num;i<actual_data_num+actual_redundant_num;i++)
  284. {
  285. packet_to_send[packet_to_send_counter++]=i;
  286. }
  287. output_n=packet_to_send_counter;//re write
  288. for(int i=0;i<packet_to_send_counter;i++)
  289. {
  290. output_buf[i]=output_buf[packet_to_send[i]];
  291. output_len[i]=output_len[packet_to_send[i]];
  292. }
  293. }
  294. }
  295. else
  296. {
  297. if(encode_fast_send&&s!=0&&type==1)
  298. {
  299. assert(counter>=1);
  300. assert(counter<=255);
  301. int buf_idx=counter-1;
  302. assert(ready_for_output==0);
  303. ready_for_output=1;
  304. output_n=1;
  305. int tmp_idx=0;
  306. write_u32(buf[buf_idx]+tmp_idx,seq);
  307. tmp_idx+=sizeof(u32_t);
  308. buf[buf_idx][tmp_idx++]=(unsigned char)type;
  309. buf[buf_idx][tmp_idx++]=(unsigned char)0;
  310. buf[buf_idx][tmp_idx++]=(unsigned char)0;
  311. buf[buf_idx][tmp_idx++]=(unsigned char)((u32_t)buf_idx);
  312. output_len[0]=buf_s_len[buf_idx]+tmp_idx;
  313. output_buf[0]=buf[buf_idx];
  314. }
  315. }
  316. if(s!=0&&delayed_append)
  317. {
  318. assert(type!=1);
  319. append(s,len);
  320. }
  321. return 0;
  322. }
  323. int fec_encode_manager_t::output(int &n,char ** &s_arr,int *&len)
  324. {
  325. if(!ready_for_output)
  326. {
  327. n=-1;
  328. len=0;
  329. s_arr=0;
  330. }
  331. else
  332. {
  333. n=output_n;
  334. len=output_len;
  335. s_arr=output_buf;
  336. ready_for_output=0;
  337. }
  338. return 0;
  339. }
  340. fec_decode_manager_t::fec_decode_manager_t()
  341. {
  342. re_init();
  343. }
  344. int fec_decode_manager_t::re_init()
  345. {
  346. for(int i=0;i<(int)fec_buff_size;i++)
  347. fec_data[i].used=0;
  348. ready_for_output=0;
  349. return 0;
  350. }
  351. int fec_decode_manager_t::input(char *s,int len)
  352. {
  353. assert(s!=0);
  354. int tmp_idx=0;
  355. u32_t seq=read_u32(s+tmp_idx);
  356. tmp_idx+=sizeof(u32_t);
  357. int type=(unsigned char)s[tmp_idx++];
  358. int data_num=(unsigned char)s[tmp_idx++];
  359. int redundant_num=(unsigned char)s[tmp_idx++];
  360. int inner_index=(unsigned char)s[tmp_idx++];
  361. len=len-tmp_idx;
  362. mylog(log_trace,"input\n");
  363. if(len<0)
  364. {
  365. mylog(log_warn,"len<0\n");
  366. return -1;
  367. }
  368. if(type==1&&len<(int)sizeof(u16_t))
  369. {
  370. mylog(log_warn,"type==1&&len<2\n");
  371. return -1;
  372. }
  373. if(type==1)
  374. {
  375. if(inner_index<data_num&&(int)( read_u16(s+tmp_idx)+sizeof(u16_t))!=len)
  376. {
  377. mylog(log_warn,"inner_index<data_num&&read_u16(s+tmp_idx)+sizeof(u16_t)!=len %d %d\n",(int)( read_u16(s+tmp_idx)+sizeof(u16_t)),len);
  378. return -1;
  379. }
  380. }
  381. if(data_num+redundant_num>max_fec_packet_num)
  382. {
  383. mylog(log_info,"failed here\n");
  384. return -1;
  385. }
  386. if(!anti_replay.is_vaild(seq))
  387. {
  388. //mylog(log_info,"failed here2\n");
  389. return 0;
  390. }
  391. if(mp[seq].group_mp.find(inner_index)!=mp[seq].group_mp.end() )
  392. {
  393. mylog(log_info,"dup inner_index\n");
  394. return -1;
  395. }
  396. int ok=1;
  397. if(mp[seq].type==-1)
  398. mp[seq].type=type;
  399. else
  400. {
  401. if(mp[seq].type!=type) ok=0;
  402. }
  403. if(type==0&&data_num==0)
  404. {
  405. mylog(log_warn,"unexpected here,data_num=0\n");
  406. return -1;
  407. }
  408. if(data_num!=0)
  409. {
  410. mp[seq].data_counter++;
  411. if(mp[seq].data_num==-1)
  412. {
  413. mp[seq].data_num=data_num;
  414. mp[seq].redundant_num=redundant_num;
  415. mp[seq].len=len;
  416. }
  417. else
  418. {
  419. if(mp[seq].data_num!=data_num||mp[seq].redundant_num!=redundant_num||mp[seq].len!=len)
  420. {
  421. mylog(log_warn,"unexpected here\n");
  422. ok=0;
  423. }
  424. }
  425. }
  426. if(ok==0)
  427. {
  428. //mylog(log_info,"ok=0\n");
  429. return -1;
  430. }
  431. else
  432. {
  433. //mylog(log_info,"ok=1\n");
  434. }
  435. if(fec_data[index].used!=0)
  436. {
  437. u32_t tmp_seq=fec_data[index].seq;
  438. anti_replay.set_invaild(tmp_seq);
  439. if(mp.find(tmp_seq)!=mp.end())
  440. {
  441. mp.erase(tmp_seq);
  442. }
  443. if(tmp_seq==seq)
  444. {
  445. return -1;
  446. }
  447. }
  448. fec_data[index].used=1;
  449. fec_data[index].seq=seq;
  450. fec_data[index].type=type;
  451. fec_data[index].data_num=data_num;
  452. fec_data[index].redundant_num=redundant_num;
  453. fec_data[index].idx=inner_index;
  454. fec_data[index].len=len;
  455. memcpy(fec_data[index].buf,s+tmp_idx,len);
  456. mp[seq].group_mp[inner_index]=index;
  457. map<int,int> &inner_mp=mp[seq].group_mp;
  458. int about_to_fec=0;
  459. if(type==0)
  460. {
  461. assert((int)inner_mp.size()<=data_num);
  462. if((int)inner_mp.size()==data_num)
  463. about_to_fec=1;
  464. }
  465. else
  466. {
  467. if(mp[seq].data_num!=-1)
  468. {
  469. //int old_data_num=data_num;
  470. //int old_redundant_num=redundant_num;
  471. //data_num=mp[seq].data_num;
  472. //redundant_num=mp[seq].redundant_num;
  473. /*if(mp[seq].data_counter>data_num) //invaild
  474. {
  475. mylog(log_warn,"unexpected mp[seq].data_counter>mp[seq].data_num\n");
  476. anti_replay.set_invaild(seq);
  477. return -1;
  478. }*/
  479. //assert((int)inner_mp.size()<=data_num);
  480. if((int)inner_mp.size()>=mp[seq].data_num)
  481. {
  482. about_to_fec=1;
  483. }
  484. /*
  485. else if(mp[seq].data_counter==data_num) //no need to fec . (received first redundant packet ,or received a data packet after redunant packet)
  486. {
  487. anti_replay.set_invaild(seq);// dont do fec,but still set invaild
  488. }
  489. else //mp[seq].data_counter < mp[seq].data_num
  490. {
  491. if((int)inner_mp.size()>data_num)
  492. {
  493. mylog(log_warn,"unexpected (int)inner_mp.size()>data_num+1\n");
  494. anti_replay.set_invaild(seq);
  495. return -1;
  496. }*/
  497. //if((int)inner_mp.size()==data_num)
  498. //about_to_fec=1;
  499. /*
  500. if((int)inner_mp.size()==data_num+1)
  501. {
  502. anti_replay.set_invaild(seq);
  503. if(old_data_num==0)
  504. return 0;
  505. else
  506. {
  507. mylog(log_warn,"data_num=0\n");
  508. return -1;
  509. }
  510. }*/
  511. //}
  512. }
  513. else
  514. {
  515. }
  516. //for()
  517. }
  518. if(about_to_fec)
  519. {
  520. int group_data_num=mp[seq].data_num;
  521. int group_redundant_num=mp[seq].redundant_num;
  522. //mylog(log_error,"fec here!\n");
  523. if(type==0)
  524. {
  525. char *fec_tmp_arr[max_fec_packet_num+5]={0};
  526. for(auto it=inner_mp.begin();it!=inner_mp.end();it++)
  527. {
  528. fec_tmp_arr[it->first]=fec_data[it->second].buf;
  529. }
  530. rs_decode2(group_data_num,group_data_num+group_redundant_num,fec_tmp_arr,len); //the input data has been modified in-place
  531. blob_decode.clear();
  532. for(int i=0;i<group_data_num;i++)
  533. {
  534. blob_decode.input(fec_tmp_arr[i],len);
  535. }
  536. blob_decode.output(output_n,output_s_arr,output_len_arr);
  537. assert(ready_for_output==0);
  538. ready_for_output=1;
  539. anti_replay.set_invaild(seq);
  540. }
  541. else
  542. {
  543. int max_len=-1;
  544. int fec_ok=1;
  545. int debug_num=inner_mp.size();
  546. //outupt_s_arr_buf[max_fec_packet_num+5]={0};
  547. //memset(output_s_arr_buf,0,sizeof(output_s_arr_buf));//in efficient
  548. for(int i=0;i<group_data_num+group_redundant_num;i++)
  549. {
  550. output_s_arr_buf[i]=0;
  551. }
  552. for(auto it=inner_mp.begin();it!=inner_mp.end();it++)
  553. {
  554. output_s_arr_buf[it->first]=fec_data[it->second].buf;
  555. assert(fec_data[it->second].len>=(int)sizeof(u16_t));
  556. if(fec_data[it->second].len > max_len)
  557. max_len=fec_data[it->second].len;
  558. }
  559. for(auto it=inner_mp.begin();it!=inner_mp.end();it++)
  560. {
  561. memset(fec_data[it->second].buf+fec_data[it->second].len,0,max_len-fec_data[it->second].len);
  562. }
  563. int missed_packet[max_fec_packet_num+5];
  564. int missed_packet_counter=0;
  565. for(int i=0;i<group_data_num;i++)
  566. {
  567. if(output_s_arr_buf[i]==0 ||i==inner_index) //only missed packet +current packet
  568. {
  569. missed_packet[missed_packet_counter++]=i;
  570. }
  571. }
  572. rs_decode2(group_data_num,group_data_num+group_redundant_num,output_s_arr_buf,max_len);
  573. for(int i=0;i<group_data_num;i++)
  574. {
  575. output_len_arr_buf[i]=read_u16(output_s_arr_buf[i]);
  576. output_s_arr_buf[i]+=sizeof(u16_t);
  577. if(output_len_arr_buf[i]>max_data_len)
  578. {
  579. mylog(log_warn,"invaild len %d,seq= %u,data_num= %d r_num= %d,i= %d\n",output_len_arr_buf[i],seq,group_data_num,group_redundant_num,i);
  580. fec_ok=0;
  581. for(int i=0;i<missed_packet_counter;i++)
  582. {
  583. log_bare(log_warn,"%d ",missed_packet[i]);
  584. }
  585. log_bare(log_warn,"\n");
  586. //break;
  587. }
  588. }
  589. if(fec_ok)
  590. {
  591. output_n=group_data_num;
  592. if(decode_fast_send)
  593. {
  594. output_n=missed_packet_counter;
  595. for(int i=0;i<missed_packet_counter;i++)
  596. {
  597. output_s_arr_buf[i]=output_s_arr_buf[missed_packet[i]];
  598. output_len_arr_buf[i]=output_len_arr_buf[missed_packet[i]];
  599. }
  600. }
  601. output_s_arr=output_s_arr_buf;
  602. output_len_arr=output_len_arr_buf;
  603. assert(ready_for_output==0);
  604. ready_for_output=1;
  605. }
  606. else
  607. {
  608. ready_for_output=0;
  609. }
  610. anti_replay.set_invaild(seq);
  611. }
  612. }
  613. else
  614. {
  615. if(decode_fast_send)
  616. {
  617. if(type==1&&data_num==0)
  618. {
  619. assert(ready_for_output==0);
  620. output_n=1;
  621. int check_len=read_u16(fec_data[index].buf);
  622. output_s_arr_buf[0]=fec_data[index].buf+sizeof(u16_t);
  623. output_len_arr_buf[0]=fec_data[index].len-sizeof(u16_t);
  624. if(output_len_arr_buf[0]!=check_len)
  625. {
  626. mylog(log_warn,"len mismatch %d %d\n",output_len_arr_buf[0],check_len);
  627. }
  628. output_s_arr=output_s_arr_buf;
  629. output_len_arr=output_len_arr_buf;
  630. ready_for_output=1;
  631. }
  632. }
  633. }
  634. index++;
  635. if(index==int(anti_replay_buff_size)) index=0;
  636. return 0;
  637. }
  638. int fec_decode_manager_t::output(int &n,char ** &s_arr,int* &len_arr)
  639. {
  640. if(!ready_for_output)
  641. {
  642. n=-1;
  643. s_arr=0;
  644. len_arr=0;
  645. }
  646. else
  647. {
  648. ready_for_output=0;
  649. n=output_n;
  650. s_arr=output_s_arr;
  651. len_arr=output_len_arr;
  652. }
  653. return 0;
  654. }