fec_manager.cpp 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. /*
  2. * fec_manager.cpp
  3. *
  4. * Created on: Sep 27, 2017
  5. * Author: root
  6. */
  7. #include "fec_manager.h"
  8. #include "log.h"
  9. #include "common.h"
  10. #include "lib/rs.h"
  11. #include "fd_manager.h"
  12. blob_encode_t::blob_encode_t()
  13. {
  14. clear();
  15. }
  16. int blob_encode_t::clear()
  17. {
  18. counter=0;
  19. current_len=(int)sizeof(u32_t);
  20. return 0;
  21. }
  22. int blob_encode_t::get_num()
  23. {
  24. return counter;
  25. }
  26. int blob_encode_t::get_shard_len(int n)
  27. {
  28. return round_up_div(current_len,n);
  29. }
  30. int blob_encode_t::get_shard_len(int n,int next_packet_len)
  31. {
  32. return round_up_div(current_len+(int)sizeof(u16_t)+next_packet_len,n);
  33. }
  34. int blob_encode_t::input(char *s,int len)
  35. {
  36. assert(current_len+len+sizeof(u16_t) <=max_fec_packet_num*buf_len);
  37. assert(len<=65535&&len>=0);
  38. counter++;
  39. assert(counter<=max_normal_packet_num);
  40. write_u16(buf+current_len,len);
  41. current_len+=sizeof(u16_t);
  42. memcpy(buf+current_len,s,len);
  43. current_len+=len;
  44. return 0;
  45. }
  46. int blob_encode_t::output(int n,char ** &s_arr,int & len)
  47. {
  48. len=round_up_div(current_len,n);
  49. write_u32(buf,counter);
  50. for(int i=0;i<n;i++)
  51. {
  52. output_arr[i]=buf+len*i;
  53. }
  54. s_arr=output_arr;
  55. return 0;
  56. }
  57. blob_decode_t::blob_decode_t()
  58. {
  59. clear();
  60. }
  61. int blob_decode_t::clear()
  62. {
  63. current_len=0;
  64. last_len=-1;
  65. counter=0;
  66. return 0;
  67. }
  68. int blob_decode_t::input(char *s,int len)
  69. {
  70. if(last_len!=-1)
  71. {
  72. assert(last_len==len);
  73. }
  74. counter++;
  75. assert(counter<=max_fec_packet_num);
  76. last_len=len;
  77. assert(current_len+len+100<(int)sizeof(buf));
  78. memcpy(buf+current_len,s,len);
  79. current_len+=len;
  80. return 0;
  81. }
  82. int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr)
  83. {
  84. int parser_pos=0;
  85. if(parser_pos+(int)sizeof(u32_t)>current_len) return -1;
  86. n=(int)read_u32(buf+parser_pos);
  87. if(n>max_normal_packet_num) {mylog(log_info,"failed 1\n");return -1;}
  88. s_arr=s_buf;
  89. len_arr=len_buf;
  90. parser_pos+=sizeof(u32_t);
  91. for(int i=0;i<n;i++)
  92. {
  93. if(parser_pos+(int)sizeof(u16_t)>current_len) {mylog(log_info,"failed2 \n");return -1;}
  94. len_arr[i]=(int)read_u16(buf+parser_pos);
  95. parser_pos+=(int)sizeof(u16_t);
  96. if(parser_pos+len_arr[i]>current_len) {mylog(log_info,"failed 3 %d %d %d\n",parser_pos,len_arr[i],current_len);return -1;}
  97. s_arr[i]=buf+parser_pos;
  98. parser_pos+=len_arr[i];
  99. }
  100. return 0;
  101. }
  102. fec_encode_manager_t::fec_encode_manager_t()
  103. {
  104. //int timer_fd;
  105. if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
  106. {
  107. mylog(log_fatal,"timer_fd create error");
  108. myexit(1);
  109. }
  110. timer_fd64=fd_manager.create(timer_fd);
  111. re_init(4,2,1200,100,10000);
  112. }
  113. fec_encode_manager_t::~fec_encode_manager_t()
  114. {
  115. fd_manager.fd64_close(timer_fd64);
  116. }
  117. u64_t fec_encode_manager_t::get_timer_fd64()
  118. {
  119. return timer_fd64;
  120. }
  121. int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time)
  122. {
  123. fec_data_num=data_num;
  124. fec_redundant_num=redundant_num;
  125. fec_mtu=mtu;
  126. fec_pending_num=pending_num;
  127. fec_pending_time=pending_time;
  128. counter=0;
  129. blob_encode.clear();
  130. ready_for_output=0;
  131. seq=0;
  132. itimerspec zero_its;
  133. memset(&zero_its, 0, sizeof(zero_its));
  134. timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
  135. return 0;
  136. }
  137. int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
  138. {
  139. if(s==0 ||blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu||counter>=fec_pending_num)
  140. {
  141. char ** blob_output;
  142. int blob_len;
  143. mylog(log_debug,"counter=%d\n",counter);
  144. if(counter==0)
  145. {
  146. if(s==0) return 0;//relax this restriction temporarily
  147. else mylog(log_warn,"message too long,ignored\n");
  148. }
  149. blob_encode.output(fec_data_num,blob_output,blob_len);
  150. for(int i=0;i<fec_data_num+fec_redundant_num;i++)
  151. {
  152. int tmp_idx=0;
  153. write_u32(buf[i]+tmp_idx,seq);
  154. tmp_idx+=sizeof(u32_t);
  155. buf[i][tmp_idx++]=(unsigned char)0;
  156. buf[i][tmp_idx++]=(unsigned char)fec_data_num;
  157. buf[i][tmp_idx++]=(unsigned char)fec_redundant_num;
  158. buf[i][tmp_idx++]=(unsigned char)i;
  159. output_buf[i]=buf[i]+tmp_idx;
  160. if(i<fec_data_num)
  161. {
  162. memcpy(buf[i]+tmp_idx,blob_output[i],blob_len);
  163. tmp_idx+=blob_len;
  164. }
  165. }
  166. output_len=blob_len+sizeof(u32_t)+4*sizeof(char);/////remember to change this 4,if modified the protocol
  167. rs_encode2(fec_data_num,fec_data_num+fec_redundant_num,output_buf,blob_len);
  168. for(int i=0;i<fec_data_num+fec_redundant_num;i++)
  169. {
  170. output_buf[i]=buf[i];
  171. }
  172. ready_for_output=1;
  173. seq++;
  174. counter=0;
  175. blob_encode.clear();
  176. }
  177. if(s!=0)
  178. {
  179. if(counter==0)
  180. {
  181. itimerspec its;
  182. memset(&its.it_interval,0,sizeof(its.it_interval));
  183. my_time_t tmp_time=fec_pending_time+get_current_time_us();
  184. its.it_value.tv_sec=tmp_time/1000000llu;
  185. its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu;
  186. timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
  187. }
  188. blob_encode.input(s,len);
  189. counter++;
  190. }
  191. return 0;
  192. }
  193. int fec_encode_manager_t::output(int &n,char ** &s_arr,int &len)
  194. {
  195. if(!ready_for_output)
  196. {
  197. n=-1;
  198. len=-1;
  199. s_arr=0;
  200. }
  201. else
  202. {
  203. n=fec_data_num+fec_redundant_num;
  204. len=output_len;
  205. s_arr=output_buf;
  206. ready_for_output=0;
  207. }
  208. return 0;
  209. }
  210. fec_decode_manager_t::fec_decode_manager_t()
  211. {
  212. re_init();
  213. }
  214. int fec_decode_manager_t::re_init()
  215. {
  216. for(int i=0;i<(int)fec_buff_size;i++)
  217. fec_data[i].used=0;
  218. ready_for_output=0;
  219. return 0;
  220. }
  221. int fec_decode_manager_t::input(char *s,int len)
  222. {
  223. assert(s!=0);
  224. int tmp_idx=0;
  225. u32_t seq=read_u32(s+tmp_idx);
  226. tmp_idx+=sizeof(u32_t);
  227. int type=(unsigned char)s[tmp_idx++];
  228. int data_num=(unsigned char)s[tmp_idx++];
  229. int redundant_num=(unsigned char)s[tmp_idx++];
  230. int inner_index=(unsigned char)s[tmp_idx++];
  231. len=len-tmp_idx;
  232. if(len<0)
  233. {
  234. return -1;
  235. }
  236. if(data_num+redundant_num>max_fec_packet_num)
  237. {
  238. return -1;
  239. }
  240. if(!anti_replay.is_vaild(seq))
  241. {
  242. return 0;
  243. }
  244. if(!mp[seq].empty())
  245. {
  246. int first_idx=mp[seq].begin()->second;
  247. int ok=1;
  248. if(fec_data[first_idx].data_num!=data_num)
  249. ok=0;
  250. if(fec_data[first_idx].redundant_num!=redundant_num)
  251. ok=0;
  252. if(fec_data[first_idx].len!=len)
  253. ok=0;
  254. if(ok==0)
  255. {
  256. return 0;
  257. }
  258. }
  259. if(fec_data[index].used!=0)
  260. {
  261. int tmp_seq=fec_data[index].seq;
  262. anti_replay.set_invaild(tmp_seq);
  263. if(mp.find(tmp_seq)!=mp.end())
  264. {
  265. mp.erase(tmp_seq);
  266. }
  267. }
  268. fec_data[index].used=1;
  269. fec_data[index].seq=seq;
  270. fec_data[index].type=type;
  271. fec_data[index].data_num=data_num;
  272. fec_data[index].redundant_num=redundant_num;
  273. fec_data[index].idx=inner_index;
  274. fec_data[index].len=len;
  275. memcpy(fec_data[index].buf,s+tmp_idx,len);
  276. mp[seq][inner_index]=index;
  277. index++;
  278. if(index==int(anti_replay_buff_size)) index=0;
  279. map<int,int> &inner_mp=mp[seq];
  280. assert((int)inner_mp.size()<=data_num);
  281. if((int)inner_mp.size()==data_num)
  282. {
  283. char *fec_tmp_arr[max_fec_packet_num+5]={0};
  284. for(auto it=inner_mp.begin();it!=inner_mp.end();it++)
  285. {
  286. fec_tmp_arr[it->first]=fec_data[it->second].buf;
  287. }
  288. rs_decode2(data_num,data_num+redundant_num,fec_tmp_arr,len); //the input data has been modified in-place
  289. blob_decode.clear();
  290. for(int i=0;i<data_num;i++)
  291. {
  292. blob_decode.input(fec_tmp_arr[i],len);
  293. }
  294. blob_decode.output(output_n,output_s_arr,output_len_arr);
  295. ready_for_output=1;
  296. anti_replay.set_invaild(seq);
  297. }
  298. return 0;
  299. }
  300. int fec_decode_manager_t::output(int &n,char ** &s_arr,int* &len_arr)
  301. {
  302. if(!ready_for_output)
  303. {
  304. n=-1;
  305. s_arr=0;
  306. len_arr=0;
  307. }
  308. else
  309. {
  310. ready_for_output=0;
  311. n=output_n;
  312. s_arr=output_s_arr;
  313. len_arr=output_len_arr;
  314. }
  315. return 0;
  316. }