fec_manager.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. /*
  2. * fec_manager.h
  3. *
  4. * Created on: Sep 27, 2017
  5. * Author: root
  6. */
  7. #ifndef FEC_MANAGER_H_
  8. #define FEC_MANAGER_H_
  9. #include "common.h"
  10. #include "log.h"
  11. #include "lib/rs.h"
  12. const int max_blob_packet_num = 30000; // how many packet can be contain in a blob_t ,can be set very large
  13. const u32_t anti_replay_buff_size = 30000; // can be set very large
  14. const int max_fec_packet_num = 255; // this is the limitation of the rs lib
  15. extern u32_t fec_buff_num;
  16. const int rs_str_len = max_fec_packet_num * 10 + 100;
  17. extern int header_overhead;
  18. extern int debug_fec_enc;
  19. extern int debug_fec_dec;
  20. struct fec_parameter_t {
  21. int version = 0;
  22. int mtu = default_mtu;
  23. int queue_len = 200;
  24. int timeout = 8 * 1000;
  25. int mode = 0;
  26. int rs_cnt = 0;
  27. struct rs_parameter_t // parameters for reed solomon
  28. {
  29. unsigned char x; // AKA fec_data_num (x should be same as <index of rs_par>+1 at the moment)
  30. unsigned char y; // fec_redundant_num
  31. } rs_par[max_fec_packet_num + 10];
  32. int rs_from_str(char *s) // todo inefficient
  33. {
  34. vector<string> str_vec = string_to_vec(s, ",");
  35. if (str_vec.size() < 1) {
  36. mylog(log_warn, "failed to parse [%s]\n", s);
  37. return -1;
  38. }
  39. vector<rs_parameter_t> par_vec;
  40. for (int i = 0; i < (int)str_vec.size(); i++) {
  41. rs_parameter_t tmp_par;
  42. string &tmp_str = str_vec[i];
  43. int x, y;
  44. if (sscanf((char *)tmp_str.c_str(), "%d:%d", &x, &y) != 2) {
  45. mylog(log_warn, "failed to parse [%s]\n", tmp_str.c_str());
  46. return -1;
  47. }
  48. if (x < 1 || y < 0 || x + y > max_fec_packet_num) {
  49. mylog(log_warn, "invaild value x=%d y=%d, x should >=1, y should >=0, x +y should <%d\n", x, y, max_fec_packet_num);
  50. return -1;
  51. }
  52. tmp_par.x = x;
  53. tmp_par.y = y;
  54. par_vec.push_back(tmp_par);
  55. }
  56. assert(par_vec.size() == str_vec.size());
  57. int found_problem = 0;
  58. for (int i = 1; i < (int)par_vec.size(); i++) {
  59. if (par_vec[i].x <= par_vec[i - 1].x) {
  60. mylog(log_warn, "error in [%s], x in x:y should be in ascend order\n", s);
  61. return -1;
  62. }
  63. int now_x = par_vec[i].x;
  64. int now_y = par_vec[i].y;
  65. int pre_x = par_vec[i - 1].x;
  66. int pre_y = par_vec[i - 1].y;
  67. double now_ratio = double(par_vec[i].y) / par_vec[i].x;
  68. double pre_ratio = double(par_vec[i - 1].y) / par_vec[i - 1].x;
  69. if (pre_ratio + 0.0001 < now_ratio) {
  70. if (found_problem == 0) {
  71. mylog(log_warn, "possible problems: %d/%d<%d/%d", pre_y, pre_x, now_y, now_x);
  72. found_problem = 1;
  73. } else {
  74. log_bare(log_warn, ", %d/%d<%d/%d", pre_y, pre_x, now_y, now_x);
  75. }
  76. }
  77. }
  78. if (found_problem) {
  79. log_bare(log_warn, " in %s\n", s);
  80. }
  81. { // special treatment for first parameter
  82. int x = par_vec[0].x;
  83. int y = par_vec[0].y;
  84. for (int i = 1; i <= x; i++) {
  85. rs_par[i - 1].x = i;
  86. rs_par[i - 1].y = y;
  87. }
  88. }
  89. for (int i = 1; i < (int)par_vec.size(); i++) {
  90. int now_x = par_vec[i].x;
  91. int now_y = par_vec[i].y;
  92. int pre_x = par_vec[i - 1].x;
  93. int pre_y = par_vec[i - 1].y;
  94. rs_par[now_x - 1].x = now_x;
  95. rs_par[now_x - 1].y = now_y;
  96. double now_ratio = double(par_vec[i].y) / par_vec[i].x;
  97. double pre_ratio = double(par_vec[i - 1].y) / par_vec[i - 1].x;
  98. // double k= double(now_y-pre_y)/double(now_x-pre_x);
  99. for (int j = pre_x + 1; j <= now_x - 1; j++) {
  100. int in_x = j;
  101. //////// int in_y= double(pre_y) + double(in_x-pre_x)*k+ 0.9999;// round to upper
  102. double distance = now_x - pre_x;
  103. /////// double in_ratio=pre_ratio*(1.0-(in_x-pre_x)/distance) + now_ratio *(1.0- (now_x-in_x)/distance);
  104. ////// int in_y= in_x*in_ratio + 0.9999;
  105. int in_y = pre_y + (now_y - pre_y) * (in_x - pre_x) / distance + 0.9999;
  106. if (in_x + in_y > max_fec_packet_num) {
  107. in_y = max_fec_packet_num - in_x;
  108. assert(in_y >= 0 && in_y <= max_fec_packet_num);
  109. }
  110. rs_par[in_x - 1].x = in_x;
  111. rs_par[in_x - 1].y = in_y;
  112. }
  113. }
  114. rs_cnt = par_vec[par_vec.size() - 1].x;
  115. return 0;
  116. }
  117. char *rs_to_str() // todo inefficient
  118. {
  119. static char res[rs_str_len];
  120. string tmp_string;
  121. char tmp_buf[100];
  122. assert(rs_cnt >= 1);
  123. for (int i = 0; i < rs_cnt; i++) {
  124. sprintf(tmp_buf, "%d:%d", int(rs_par[i].x), int(rs_par[i].y));
  125. if (i != 0)
  126. tmp_string += ",";
  127. tmp_string += tmp_buf;
  128. }
  129. strcpy(res, tmp_string.c_str());
  130. return res;
  131. }
  132. rs_parameter_t get_tail() {
  133. assert(rs_cnt >= 1);
  134. return rs_par[rs_cnt - 1];
  135. }
  136. int clone(fec_parameter_t &other) {
  137. version = other.version;
  138. mtu = other.mtu;
  139. queue_len = other.queue_len;
  140. timeout = other.timeout;
  141. mode = other.mode;
  142. assert(other.rs_cnt >= 1);
  143. rs_cnt = other.rs_cnt;
  144. memcpy(rs_par, other.rs_par, sizeof(rs_parameter_t) * rs_cnt);
  145. return 0;
  146. }
  147. int copy_fec(fec_parameter_t &other) {
  148. assert(other.rs_cnt >= 1);
  149. rs_cnt = other.rs_cnt;
  150. memcpy(rs_par, other.rs_par, sizeof(rs_parameter_t) * rs_cnt);
  151. return 0;
  152. }
  153. };
  154. extern fec_parameter_t g_fec_par;
  155. // extern int dynamic_update_fec;
  156. const int anti_replay_timeout = 120 * 1000; // 120s
  157. struct anti_replay_t {
  158. struct info_t {
  159. my_time_t my_time;
  160. int index;
  161. };
  162. u64_t replay_buffer[anti_replay_buff_size];
  163. unordered_map<u32_t, info_t> mp;
  164. int index;
  165. anti_replay_t() {
  166. clear();
  167. }
  168. int clear() {
  169. memset(replay_buffer, -1, sizeof(replay_buffer));
  170. mp.clear();
  171. mp.rehash(anti_replay_buff_size * 3);
  172. index = 0;
  173. return 0;
  174. }
  175. void set_invaild(u32_t seq) {
  176. if (is_vaild(seq) == 0) {
  177. mylog(log_trace, "seq %u exist\n", seq);
  178. // assert(mp.find(seq)!=mp.end());
  179. // mp[seq].my_time=get_current_time_rough();
  180. return;
  181. }
  182. if (replay_buffer[index] != u64_t(i64_t(-1))) {
  183. assert(mp.find(replay_buffer[index]) != mp.end());
  184. mp.erase(replay_buffer[index]);
  185. }
  186. replay_buffer[index] = seq;
  187. assert(mp.find(seq) == mp.end());
  188. mp[seq].my_time = get_current_time();
  189. mp[seq].index = index;
  190. index++;
  191. if (index == int(anti_replay_buff_size)) index = 0;
  192. }
  193. int is_vaild(u32_t seq) {
  194. if (mp.find(seq) == mp.end()) return 1;
  195. if (get_current_time() - mp[seq].my_time > anti_replay_timeout) {
  196. replay_buffer[mp[seq].index] = u64_t(i64_t(-1));
  197. mp.erase(seq);
  198. return 1;
  199. }
  200. return 0;
  201. }
  202. };
  203. struct blob_encode_t {
  204. char input_buf[(max_fec_packet_num + 5) * buf_len];
  205. int current_len;
  206. int counter;
  207. char *output_buf[max_fec_packet_num + 100];
  208. blob_encode_t();
  209. int clear();
  210. int get_num();
  211. int get_shard_len(int n);
  212. int get_shard_len(int n, int next_packet_len);
  213. int input(char *s, int len); // len=use len=0 for second and following packet
  214. int output(int n, char **&s_arr, int &len);
  215. };
  216. struct blob_decode_t {
  217. char input_buf[(max_fec_packet_num + 5) * buf_len];
  218. int current_len;
  219. int last_len;
  220. int counter;
  221. char *output_buf[max_blob_packet_num + 100];
  222. int output_len[max_blob_packet_num + 100];
  223. blob_decode_t();
  224. int clear();
  225. int input(char *input, int len);
  226. int output(int &n, char **&output, int *&len_arr);
  227. };
  228. class fec_encode_manager_t : not_copy_able_t {
  229. private:
  230. u32_t seq;
  231. // int fec_mode;
  232. // int fec_data_num,fec_redundant_num;
  233. // int fec_mtu;
  234. // int fec_queue_len;
  235. // int fec_timeout;
  236. fec_parameter_t fec_par;
  237. my_time_t first_packet_time;
  238. my_time_t first_packet_time_for_output;
  239. blob_encode_t blob_encode;
  240. char input_buf[max_fec_packet_num + 5][buf_len];
  241. int input_len[max_fec_packet_num + 100];
  242. char *output_buf[max_fec_packet_num + 100];
  243. int output_len[max_fec_packet_num + 100];
  244. int counter;
  245. // int timer_fd;
  246. // u64_t timer_fd64;
  247. int ready_for_output;
  248. u32_t output_n;
  249. int append(char *s, int len);
  250. ev_timer timer;
  251. struct ev_loop *loop = 0;
  252. void (*cb)(struct ev_loop *loop, struct ev_timer *watcher, int revents) = 0;
  253. public:
  254. fec_encode_manager_t();
  255. ~fec_encode_manager_t();
  256. fec_parameter_t &get_fec_par() {
  257. return fec_par;
  258. }
  259. void set_data(void *data) {
  260. timer.data = data;
  261. }
  262. void set_loop_and_cb(struct ev_loop *loop, void (*cb)(struct ev_loop *loop, struct ev_timer *watcher, int revents)) {
  263. this->loop = loop;
  264. this->cb = cb;
  265. ev_init(&timer, cb);
  266. }
  267. int clear_data() {
  268. counter = 0;
  269. blob_encode.clear();
  270. ready_for_output = 0;
  271. seq = (u32_t)get_fake_random_number(); // TODO temp solution for a bug.
  272. if (loop) {
  273. ev_timer_stop(loop, &timer);
  274. }
  275. return 0;
  276. }
  277. int clear_all() {
  278. // itimerspec zero_its;
  279. // memset(&zero_its, 0, sizeof(zero_its));
  280. // timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
  281. if (loop) {
  282. ev_timer_stop(loop, &timer);
  283. loop = 0;
  284. cb = 0;
  285. }
  286. clear_data();
  287. return 0;
  288. }
  289. my_time_t get_first_packet_time() {
  290. return first_packet_time_for_output;
  291. }
  292. int get_pending_time() {
  293. return fec_par.timeout;
  294. }
  295. int get_type() {
  296. return fec_par.mode;
  297. }
  298. // u64_t get_timer_fd64();
  299. int reset_fec_parameter(int data_num, int redundant_num, int mtu, int pending_num, int pending_time, int type);
  300. int input(char *s, int len /*,int &is_first_packet*/);
  301. int output(int &n, char **&s_arr, int *&len);
  302. };
  303. struct fec_data_t {
  304. int used;
  305. u32_t seq;
  306. int type;
  307. int data_num;
  308. int redundant_num;
  309. int idx;
  310. char buf[buf_len];
  311. int len;
  312. };
  313. struct fec_group_t {
  314. int type = -1;
  315. int data_num = -1;
  316. int redundant_num = -1;
  317. int len = -1;
  318. int fec_done = 0;
  319. // int data_counter=0;
  320. map<int, int> group_mp;
  321. };
  322. class fec_decode_manager_t : not_copy_able_t {
  323. anti_replay_t anti_replay;
  324. fec_data_t *fec_data = 0;
  325. unordered_map<u32_t, fec_group_t> mp;
  326. blob_decode_t blob_decode;
  327. int index;
  328. int output_n;
  329. char **output_s_arr;
  330. int *output_len_arr;
  331. int ready_for_output;
  332. char *output_s_arr_buf[max_fec_packet_num + 100]; // only for type=1,for type=0 the buf inside blot_t is used
  333. int output_len_arr_buf[max_fec_packet_num + 100]; // same
  334. public:
  335. fec_decode_manager_t() {
  336. fec_data = new fec_data_t[fec_buff_num + 5];
  337. assert(fec_data != 0);
  338. clear();
  339. }
  340. /*
  341. fec_decode_manager_t(const fec_decode_manager_t &b)
  342. {
  343. assert(0==1);//not allowed to copy
  344. }*/
  345. ~fec_decode_manager_t() {
  346. mylog(log_debug, "fec_decode_manager destroyed\n");
  347. if (fec_data != 0) {
  348. mylog(log_debug, "fec_data freed\n");
  349. delete[] fec_data;
  350. }
  351. }
  352. int clear() {
  353. anti_replay.clear();
  354. mp.clear();
  355. mp.rehash(fec_buff_num * 3);
  356. for (int i = 0; i < (int)fec_buff_num; i++)
  357. fec_data[i].used = 0;
  358. ready_for_output = 0;
  359. index = 0;
  360. return 0;
  361. }
  362. // int re_init();
  363. int input(char *s, int len);
  364. int output(int &n, char **&s_arr, int *&len_arr);
  365. };
  366. #endif /* FEC_MANAGER_H_ */