connection.h 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. /*
  2. * connection.h
  3. *
  4. * Created on: Sep 23, 2017
  5. * Author: root
  6. */
  7. #ifndef CONNECTION_H_
  8. #define CONNECTION_H_
  9. extern int disable_anti_replay;
  10. #include "connection.h"
  11. #include "common.h"
  12. #include "log.h"
  13. #include "delay_manager.h"
  14. #include "fd_manager.h"
  15. #include "fec_manager.h"
  16. extern int report_interval;
  17. const int disable_conv_clear = 0;
  18. void server_clear_function(u64_t u64);
  19. template <class T>
  20. struct conv_manager_t // manage the udp connections
  21. {
  22. // typedef hash_map map;
  23. unordered_map<T, u32_t> data_to_conv; // conv and u64 are both supposed to be uniq
  24. unordered_map<u32_t, T> conv_to_data;
  25. lru_collector_t<u32_t> lru;
  26. // unordered_map<u32_t,u64_t> conv_last_active_time;
  27. // unordered_map<u32_t,u64_t>::iterator clear_it;
  28. void (*additional_clear_function)(T data) = 0;
  29. long long last_clear_time;
  30. conv_manager_t() {
  31. // clear_it=conv_last_active_time.begin();
  32. long long last_clear_time = 0;
  33. additional_clear_function = 0;
  34. }
  35. ~conv_manager_t() {
  36. clear();
  37. }
  38. int get_size() {
  39. return conv_to_data.size();
  40. }
  41. void reserve() {
  42. data_to_conv.reserve(10007);
  43. conv_to_data.reserve(10007);
  44. // conv_last_active_time.reserve(10007);
  45. lru.mp.reserve(10007);
  46. }
  47. void clear() {
  48. if (disable_conv_clear) return;
  49. if (additional_clear_function != 0) {
  50. for (auto it = conv_to_data.begin(); it != conv_to_data.end(); it++) {
  51. // int fd=int((it->second<<32u)>>32u);
  52. additional_clear_function(it->second);
  53. }
  54. }
  55. data_to_conv.clear();
  56. conv_to_data.clear();
  57. lru.clear();
  58. // conv_last_active_time.clear();
  59. // clear_it=conv_last_active_time.begin();
  60. }
  61. u32_t get_new_conv() {
  62. u32_t conv = get_fake_random_number_nz();
  63. while (conv_to_data.find(conv) != conv_to_data.end()) {
  64. conv = get_fake_random_number_nz();
  65. }
  66. return conv;
  67. }
  68. int is_conv_used(u32_t conv) {
  69. return conv_to_data.find(conv) != conv_to_data.end();
  70. }
  71. int is_data_used(T data) {
  72. return data_to_conv.find(data) != data_to_conv.end();
  73. }
  74. u32_t find_conv_by_data(T data) {
  75. return data_to_conv[data];
  76. }
  77. T find_data_by_conv(u32_t conv) {
  78. return conv_to_data[conv];
  79. }
  80. int update_active_time(u32_t conv) {
  81. // return conv_last_active_time[conv]=get_current_time();
  82. lru.update(conv);
  83. return 0;
  84. }
  85. int insert_conv(u32_t conv, T data) {
  86. data_to_conv[data] = conv;
  87. conv_to_data[conv] = data;
  88. // conv_last_active_time[conv]=get_current_time();
  89. lru.new_key(conv);
  90. return 0;
  91. }
  92. int erase_conv(u32_t conv) {
  93. if (disable_conv_clear) return 0;
  94. T data = conv_to_data[conv];
  95. if (additional_clear_function != 0) {
  96. additional_clear_function(data);
  97. }
  98. conv_to_data.erase(conv);
  99. data_to_conv.erase(data);
  100. // conv_last_active_time.erase(conv);
  101. lru.erase(conv);
  102. return 0;
  103. }
  104. int clear_inactive(char *info = 0) {
  105. if (get_current_time() - last_clear_time > conv_clear_interval) {
  106. last_clear_time = get_current_time();
  107. return clear_inactive0(info);
  108. }
  109. return 0;
  110. }
  111. int clear_inactive0(char *info) {
  112. if (disable_conv_clear) return 0;
  113. unordered_map<u32_t, u64_t>::iterator it;
  114. unordered_map<u32_t, u64_t>::iterator old_it;
  115. // map<uint32_t,uint64_t>::iterator it;
  116. int cnt = 0;
  117. // it=clear_it;
  118. int size = lru.size();
  119. int num_to_clean = size / conv_clear_ratio + conv_clear_min; // clear 1/10 each time,to avoid latency glitch
  120. num_to_clean = min(num_to_clean, size);
  121. my_time_t current_time = get_current_time();
  122. for (;;) {
  123. if (cnt >= num_to_clean) break;
  124. if (lru.empty()) break;
  125. u32_t conv;
  126. my_time_t ts = lru.peek_back(conv);
  127. if (current_time - ts < conv_timeout) break;
  128. erase_conv(conv);
  129. if (info == 0) {
  130. mylog(log_info, "conv %x cleared\n", conv);
  131. } else {
  132. mylog(log_info, "[%s]conv %x cleared\n", info, conv);
  133. }
  134. cnt++;
  135. }
  136. return 0;
  137. }
  138. /*
  139. conv_manager_t();
  140. ~conv_manager_t();
  141. int get_size();
  142. void reserve();
  143. void clear();
  144. u32_t get_new_conv();
  145. int is_conv_used(u32_t conv);
  146. int is_u64_used(T u64);
  147. u32_t find_conv_by_u64(T u64);
  148. T find_u64_by_conv(u32_t conv);
  149. int update_active_time(u32_t conv);
  150. int insert_conv(u32_t conv,T u64);
  151. int erase_conv(u32_t conv);
  152. int clear_inactive(char * ip_port=0);
  153. int clear_inactive0(char * ip_port);*/
  154. }; // g_conv_manager;
  155. struct inner_stat_t {
  156. u64_t input_packet_num;
  157. u64_t input_packet_size;
  158. u64_t output_packet_num;
  159. u64_t output_packet_size;
  160. };
  161. struct stat_t {
  162. u64_t last_report_time;
  163. inner_stat_t normal_to_fec;
  164. inner_stat_t fec_to_normal;
  165. stat_t() {
  166. memset(this, 0, sizeof(stat_t));
  167. }
  168. void report_as_client() {
  169. if (report_interval != 0 && get_current_time() - last_report_time > u64_t(report_interval) * 1000) {
  170. last_report_time = get_current_time();
  171. inner_stat_t &a = normal_to_fec;
  172. inner_stat_t &b = fec_to_normal;
  173. mylog(log_info, "[report]client-->server:(original:%llu pkt;%llu byte) (fec:%llu pkt,%llu byte) server-->client:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte)\n",
  174. a.input_packet_num, a.input_packet_size, a.output_packet_num, a.output_packet_size,
  175. b.output_packet_num, b.output_packet_size, b.input_packet_num, b.input_packet_size);
  176. }
  177. }
  178. void report_as_server(address_t &addr) {
  179. if (report_interval != 0 && get_current_time() - last_report_time > u64_t(report_interval) * 1000) {
  180. last_report_time = get_current_time();
  181. inner_stat_t &a = fec_to_normal;
  182. inner_stat_t &b = normal_to_fec;
  183. mylog(log_info, "[report][%s]client-->server:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte) server-->client:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte)\n",
  184. addr.get_str(),
  185. a.output_packet_num, a.output_packet_size, a.input_packet_num, a.input_packet_size,
  186. b.input_packet_num, b.input_packet_size, b.output_packet_num, b.output_packet_size);
  187. }
  188. }
  189. };
  190. struct conn_info_t : not_copy_able_t // stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can
  191. // handle multiple clients
  192. {
  193. union tmp_union_t {
  194. conv_manager_t<address_t> c;
  195. conv_manager_t<u64_t> s;
  196. // avoid templates here and there, avoid pointer and type cast
  197. tmp_union_t() {
  198. if (program_mode == client_mode) {
  199. new (&c) conv_manager_t<address_t>();
  200. } else {
  201. assert(program_mode == server_mode);
  202. new (&s) conv_manager_t<u64_t>();
  203. }
  204. }
  205. ~tmp_union_t() {
  206. if (program_mode == client_mode) {
  207. c.~conv_manager_t<address_t>();
  208. } else {
  209. assert(program_mode == server_mode);
  210. s.~conv_manager_t<u64_t>();
  211. }
  212. }
  213. } conv_manager;
  214. fec_encode_manager_t fec_encode_manager;
  215. fec_decode_manager_t fec_decode_manager;
  216. ev_timer timer;
  217. // my_timer_t timer;
  218. u64_t last_active_time;
  219. stat_t stat;
  220. struct ev_loop *loop = 0;
  221. int local_listen_fd;
  222. int remote_fd; // only used for client
  223. fd64_t remote_fd64; // only used for client
  224. // ip_port_t ip_port;
  225. address_t addr; // only used for server
  226. conn_info_t() {
  227. if (program_mode == server_mode) {
  228. conv_manager.s.additional_clear_function = server_clear_function;
  229. } else {
  230. assert(program_mode == client_mode);
  231. }
  232. }
  233. ~conn_info_t() {
  234. if (loop)
  235. ev_timer_stop(loop, &timer);
  236. }
  237. void update_active_time() {
  238. last_active_time = get_current_time();
  239. }
  240. /*
  241. conn_info_t(const conn_info_t &b)
  242. {
  243. assert(0==1);
  244. }*/
  245. };
  246. /*
  247. struct conn_manager_t //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections
  248. {
  249. unordered_map<u64_t,conn_info_t*> mp;//<ip,port> to conn_info_t;
  250. unordered_map<u64_t,conn_info_t*>::iterator clear_it;
  251. long long last_clear_time;
  252. conn_manager_t();
  253. conn_manager_t(const conn_info_t &b)
  254. {
  255. assert(0==1);
  256. }
  257. int exist(ip_port_t);
  258. conn_info_t *& find_p(ip_port_t); //be aware,the adress may change after rehash
  259. conn_info_t & find(ip_port_t) ; //be aware,the adress may change after rehash
  260. int insert(ip_port_t);
  261. int erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it);
  262. int clear_inactive();
  263. int clear_inactive0();
  264. };*/
  265. struct conn_manager_t // manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections
  266. {
  267. unordered_map<address_t, conn_info_t *> mp; // put it at end so that it de-consturcts first
  268. unordered_map<address_t, conn_info_t *>::iterator clear_it;
  269. long long last_clear_time;
  270. conn_manager_t();
  271. int exist(address_t addr);
  272. conn_info_t *&find_insert_p(address_t addr); // be aware,the adress may change after rehash //not true?
  273. conn_info_t &find_insert(address_t addr); // be aware,the adress may change after rehash
  274. int erase(unordered_map<address_t, conn_info_t *>::iterator erase_it);
  275. int clear_inactive();
  276. int clear_inactive0();
  277. };
  278. extern conn_manager_t conn_manager;
  279. #endif /* CONNECTION_H_ */