fec_manager.h 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. /*
  2. * fec_manager.h
  3. *
  4. * Created on: Sep 27, 2017
  5. * Author: root
  6. */
  7. #ifndef FEC_MANAGER_H_
  8. #define FEC_MANAGER_H_
  9. #include "common.h"
  10. #include "log.h"
  11. #include "lib/rs.h"
  12. const int max_blob_packet_num=30000;//how many packet can be contain in a blob_t ,can be set very large
  13. const u32_t anti_replay_buff_size=30000;//can be set very large
  14. const int max_fec_packet_num=255;// this is the limitation of the rs lib
  15. extern u32_t fec_buff_num;
  16. const int rs_str_len=max_fec_packet_num*10+100;
  17. extern int header_overhead;
  18. extern int debug_fec;
  19. struct fec_parameter_t
  20. {
  21. int version=0;
  22. int mtu=1250;
  23. int queue_len=200;
  24. int timeout=8*1000;
  25. int mode=0;
  26. int rs_cnt=0;
  27. struct rs_parameter_t //parameters for reed solomon
  28. {
  29. unsigned char x;//AKA fec_data_num (x should be same as <index of rs_par>+1 at the moment)
  30. unsigned char y;//fec_redundant_num
  31. }rs_par[max_fec_packet_num+10];
  32. int rs_from_str(char * s)//todo inefficient
  33. {
  34. vector<string> str_vec=string_to_vec(s,",");
  35. if(str_vec.size()<1)
  36. {
  37. mylog(log_warn,"failed to parse [%s]\n",s);
  38. return -1;
  39. }
  40. vector<rs_parameter_t> par_vec;
  41. for(int i=0;i<(int)str_vec.size();i++)
  42. {
  43. rs_parameter_t tmp_par;
  44. string &tmp_str=str_vec[i];
  45. int x,y;
  46. if(sscanf((char *)tmp_str.c_str(),"%d:%d",&x,&y)!=2)
  47. {
  48. mylog(log_warn,"failed to parse [%s]\n",tmp_str.c_str());
  49. return -1;
  50. }
  51. if(x<1||y<0||x+y>max_fec_packet_num)
  52. {
  53. mylog(log_warn,"invaild value x=%d y=%d, x should >=1, y should >=0, x +y should <%d\n",x,y,max_fec_packet_num);
  54. return -1;
  55. }
  56. tmp_par.x=x;
  57. tmp_par.y=y;
  58. par_vec.push_back(tmp_par);
  59. }
  60. assert(par_vec.size()==str_vec.size());
  61. int found_problem=0;
  62. for(int i=1;i<(int)par_vec.size();i++)
  63. {
  64. if(par_vec[i].x<=par_vec[i-1].x)
  65. {
  66. mylog(log_warn,"error in [%s], x in x:y should be in ascend order\n",s);
  67. return -1;
  68. }
  69. int now_x=par_vec[i].x;
  70. int now_y=par_vec[i].y;
  71. int pre_x=par_vec[i-1].x;
  72. int pre_y=par_vec[i-1].y;
  73. double now_ratio=double(par_vec[i].y)/par_vec[i].x;
  74. double pre_ratio=double(par_vec[i-1].y)/par_vec[i-1].x;
  75. if(pre_ratio+0.0001<now_ratio)
  76. {
  77. if(found_problem==0)
  78. {
  79. mylog(log_warn,"possible problems: %d/%d<%d/%d",pre_y,pre_x,now_y,now_x);
  80. found_problem=1;
  81. }
  82. else
  83. {
  84. log_bare(log_warn,", %d/%d<%d/%d",pre_y,pre_x,now_y,now_x);
  85. }
  86. }
  87. }
  88. if(found_problem)
  89. {
  90. log_bare(log_warn," in %s\n",s);
  91. }
  92. { //special treatment for first parameter
  93. int x=par_vec[0].x;
  94. int y=par_vec[0].y;
  95. for(int i=1;i<=x;i++)
  96. {
  97. rs_par[i-1].x=i;
  98. rs_par[i-1].y=y;
  99. }
  100. }
  101. for(int i=1;i<(int)par_vec.size();i++)
  102. {
  103. int now_x=par_vec[i].x;
  104. int now_y=par_vec[i].y;
  105. int pre_x=par_vec[i-1].x;
  106. int pre_y=par_vec[i-1].y;
  107. rs_par[now_x-1].x=now_x;
  108. rs_par[now_x-1].y=now_y;
  109. double now_ratio=double(par_vec[i].y)/par_vec[i].x;
  110. double pre_ratio=double(par_vec[i-1].y)/par_vec[i-1].x;
  111. //double k= double(now_y-pre_y)/double(now_x-pre_x);
  112. for(int j=pre_x+1;j<=now_x-1;j++)
  113. {
  114. int in_x=j;
  115. //////// int in_y= double(pre_y) + double(in_x-pre_x)*k+ 0.9999;// round to upper
  116. double distance=now_x-pre_x;
  117. /////// double in_ratio=pre_ratio*(1.0-(in_x-pre_x)/distance) + now_ratio *(1.0- (now_x-in_x)/distance);
  118. ////// int in_y= in_x*in_ratio + 0.9999;
  119. int in_y= pre_y +(now_y-pre_y) *(in_x-pre_x)/distance +0.9999;
  120. if(in_x+in_y>max_fec_packet_num)
  121. {
  122. in_y=max_fec_packet_num-in_x;
  123. assert(in_y>=0&&in_y<=max_fec_packet_num);
  124. }
  125. rs_par[in_x-1].x=in_x;
  126. rs_par[in_x-1].y=in_y;
  127. }
  128. }
  129. rs_cnt=par_vec[par_vec.size()-1].x;
  130. return 0;
  131. }
  132. char *rs_to_str()//todo inefficient
  133. {
  134. static char res[rs_str_len];
  135. string tmp_string;
  136. char tmp_buf[100];
  137. assert(rs_cnt>=1);
  138. for(int i=0;i<rs_cnt;i++)
  139. {
  140. sprintf(tmp_buf,"%d:%d",int(rs_par[i].x),int(rs_par[i].y));
  141. if(i!=0)
  142. tmp_string+=",";
  143. tmp_string+=tmp_buf;
  144. }
  145. strcpy(res,tmp_string.c_str());
  146. return res;
  147. }
  148. rs_parameter_t get_tail()
  149. {
  150. assert(rs_cnt>=1);
  151. return rs_par[rs_cnt-1];
  152. }
  153. int clone(fec_parameter_t & other)
  154. {
  155. version=other.version;
  156. mtu=other.mtu;
  157. queue_len=other.queue_len;
  158. timeout=other.timeout;
  159. mode=other.mode;
  160. assert(other.rs_cnt>=1);
  161. rs_cnt=other.rs_cnt;
  162. memcpy(rs_par,other.rs_par,sizeof(rs_parameter_t)*rs_cnt);
  163. return 0;
  164. }
  165. };
  166. extern fec_parameter_t g_fec_par;
  167. //extern int dynamic_update_fec;
  168. const int anti_replay_timeout=60*1000;// 60s
  169. struct anti_replay_t
  170. {
  171. struct info_t
  172. {
  173. my_time_t my_time;
  174. int index;
  175. };
  176. u64_t replay_buffer[anti_replay_buff_size];
  177. unordered_map<u32_t,info_t> mp;
  178. int index;
  179. anti_replay_t()
  180. {
  181. clear();
  182. }
  183. int clear()
  184. {
  185. memset(replay_buffer,-1,sizeof(replay_buffer));
  186. mp.clear();
  187. mp.rehash(anti_replay_buff_size*3);
  188. index=0;
  189. return 0;
  190. }
  191. void set_invaild(u32_t seq)
  192. {
  193. if(is_vaild(seq)==0)
  194. {
  195. mylog(log_trace,"seq %u exist\n",seq);
  196. //assert(mp.find(seq)!=mp.end());
  197. //mp[seq].my_time=get_current_time_rough();
  198. return;
  199. }
  200. if(replay_buffer[index]!=u64_t(i64_t(-1)))
  201. {
  202. assert(mp.find(replay_buffer[index])!=mp.end());
  203. mp.erase(replay_buffer[index]);
  204. }
  205. replay_buffer[index]=seq;
  206. assert(mp.find(seq)==mp.end());
  207. mp[seq].my_time=get_current_time_rough();
  208. mp[seq].index=index;
  209. index++;
  210. if(index==int(anti_replay_buff_size)) index=0;
  211. }
  212. int is_vaild(u32_t seq)
  213. {
  214. if(mp.find(seq)==mp.end()) return 1;
  215. if(get_current_time_rough()-mp[seq].my_time>anti_replay_timeout)
  216. {
  217. replay_buffer[mp[seq].index]=u64_t(i64_t(-1));
  218. mp.erase(seq);
  219. return 1;
  220. }
  221. return 0;
  222. }
  223. };
  224. struct blob_encode_t
  225. {
  226. char input_buf[(max_fec_packet_num+5)*buf_len];
  227. int current_len;
  228. int counter;
  229. char *output_buf[max_fec_packet_num+100];
  230. blob_encode_t();
  231. int clear();
  232. int get_num();
  233. int get_shard_len(int n);
  234. int get_shard_len(int n,int next_packet_len);
  235. int input(char *s,int len); //len=use len=0 for second and following packet
  236. int output(int n,char ** &s_arr,int & len);
  237. };
  238. struct blob_decode_t
  239. {
  240. char input_buf[(max_fec_packet_num+5)*buf_len];
  241. int current_len;
  242. int last_len;
  243. int counter;
  244. char *output_buf[max_blob_packet_num+100];
  245. int output_len[max_blob_packet_num+100];
  246. blob_decode_t();
  247. int clear();
  248. int input(char *input,int len);
  249. int output(int &n,char ** &output,int *&len_arr);
  250. };
  251. class fec_encode_manager_t:not_copy_able_t
  252. {
  253. private:
  254. u32_t seq;
  255. //int fec_mode;
  256. //int fec_data_num,fec_redundant_num;
  257. //int fec_mtu;
  258. //int fec_queue_len;
  259. //int fec_timeout;
  260. fec_parameter_t fec_par;
  261. my_time_t first_packet_time;
  262. my_time_t first_packet_time_for_output;
  263. blob_encode_t blob_encode;
  264. char input_buf[max_fec_packet_num+5][buf_len];
  265. int input_len[max_fec_packet_num+100];
  266. char *output_buf[max_fec_packet_num+100];
  267. int output_len[max_fec_packet_num+100];
  268. int counter;
  269. //int timer_fd;
  270. //u64_t timer_fd64;
  271. int ready_for_output;
  272. u32_t output_n;
  273. int append(char *s,int len);
  274. ev_timer timer;
  275. struct ev_loop *loop=0;
  276. void (*cb) (struct ev_loop *loop, struct ev_timer *watcher, int revents)=0;
  277. public:
  278. fec_encode_manager_t();
  279. ~fec_encode_manager_t();
  280. fec_parameter_t & get_fec_par()
  281. {
  282. return fec_par;
  283. }
  284. void set_data(void * data)
  285. {
  286. timer.data=data;
  287. }
  288. void set_loop_and_cb(struct ev_loop *loop,void (*cb) (struct ev_loop *loop, struct ev_timer *watcher, int revents))
  289. {
  290. this->loop=loop;
  291. this->cb=cb;
  292. ev_init(&timer,cb);
  293. }
  294. int clear_data()
  295. {
  296. counter=0;
  297. blob_encode.clear();
  298. ready_for_output=0;
  299. seq=(u32_t)get_fake_random_number(); //TODO temp solution for a bug.
  300. if(loop)
  301. {
  302. ev_timer_stop(loop,&timer);
  303. }
  304. return 0;
  305. }
  306. int clear_all()
  307. {
  308. //itimerspec zero_its;
  309. //memset(&zero_its, 0, sizeof(zero_its));
  310. //timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
  311. if(loop)
  312. {
  313. ev_timer_stop(loop,&timer);
  314. loop=0;
  315. cb=0;
  316. }
  317. clear_data();
  318. return 0;
  319. }
  320. my_time_t get_first_packet_time()
  321. {
  322. return first_packet_time_for_output;
  323. }
  324. int get_pending_time()
  325. {
  326. return fec_par.timeout;
  327. }
  328. int get_type()
  329. {
  330. return fec_par.mode;
  331. }
  332. //u64_t get_timer_fd64();
  333. int reset_fec_parameter(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type);
  334. int input(char *s,int len/*,int &is_first_packet*/);
  335. int output(int &n,char ** &s_arr,int *&len);
  336. };
  337. struct fec_data_t
  338. {
  339. int used;
  340. u32_t seq;
  341. int type;
  342. int data_num;
  343. int redundant_num;
  344. int idx;
  345. char buf[buf_len];
  346. int len;
  347. };
  348. struct fec_group_t
  349. {
  350. int type=-1;
  351. int data_num=-1;
  352. int redundant_num=-1;
  353. int len=-1;
  354. //int data_counter=0;
  355. map<int,int> group_mp;
  356. };
  357. class fec_decode_manager_t:not_copy_able_t
  358. {
  359. anti_replay_t anti_replay;
  360. fec_data_t *fec_data=0;
  361. unordered_map<u32_t, fec_group_t> mp;
  362. blob_decode_t blob_decode;
  363. int index;
  364. int output_n;
  365. char ** output_s_arr;
  366. int * output_len_arr;
  367. int ready_for_output;
  368. char *output_s_arr_buf[max_fec_packet_num+100];//only for type=1,for type=0 the buf inside blot_t is used
  369. int output_len_arr_buf[max_fec_packet_num+100];//same
  370. public:
  371. fec_decode_manager_t()
  372. {
  373. fec_data=new fec_data_t[fec_buff_num+5];
  374. assert(fec_data!=0);
  375. clear();
  376. }
  377. /*
  378. fec_decode_manager_t(const fec_decode_manager_t &b)
  379. {
  380. assert(0==1);//not allowed to copy
  381. }*/
  382. ~fec_decode_manager_t()
  383. {
  384. mylog(log_debug,"fec_decode_manager destroyed\n");
  385. if(fec_data!=0)
  386. {
  387. mylog(log_debug,"fec_data freed\n");
  388. delete fec_data;
  389. }
  390. }
  391. int clear()
  392. {
  393. anti_replay.clear();
  394. mp.clear();
  395. mp.rehash(fec_buff_num*3);
  396. for(int i=0;i<(int)fec_buff_num;i++)
  397. fec_data[i].used=0;
  398. ready_for_output=0;
  399. index=0;
  400. return 0;
  401. }
  402. //int re_init();
  403. int input(char *s,int len);
  404. int output(int &n,char ** &s_arr,int* &len_arr);
  405. };
  406. #endif /* FEC_MANAGER_H_ */