connection.cpp 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. /*
  2. * connection.cpp
  3. *
  4. * Created on: Sep 23, 2017
  5. * Author: root
  6. */
  7. #include "connection.h"
  8. int disable_anti_replay=0;//if anti_replay windows is diabled
  9. const int disable_conv_clear=0;//a udp connection in the multiplexer is called conversation in this program,conv for short.
  10. const int disable_conn_clear=0;//a raw connection is called conn.
  11. conn_manager_t conn_manager;
  12. void server_clear_function(u64_t u64);
  13. conv_manager_t::conv_manager_t()
  14. {
  15. clear_it=conv_last_active_time.begin();
  16. long long last_clear_time=0;
  17. //clear_function=0;
  18. }
  19. conv_manager_t::~conv_manager_t()
  20. {
  21. clear();
  22. }
  23. int conv_manager_t::get_size()
  24. {
  25. return conv_to_u64.size();
  26. }
  27. void conv_manager_t::reserve()
  28. {
  29. u64_to_conv.reserve(10007);
  30. conv_to_u64.reserve(10007);
  31. conv_last_active_time.reserve(10007);
  32. }
  33. void conv_manager_t::clear()
  34. {
  35. if(disable_conv_clear) return ;
  36. if(program_mode==server_mode)
  37. {
  38. for(it=conv_to_u64.begin();it!=conv_to_u64.end();it++)
  39. {
  40. //int fd=int((it->second<<32u)>>32u);
  41. server_clear_function( it->second);
  42. }
  43. }
  44. u64_to_conv.clear();
  45. conv_to_u64.clear();
  46. conv_last_active_time.clear();
  47. clear_it=conv_last_active_time.begin();
  48. }
  49. u32_t conv_manager_t::get_new_conv()
  50. {
  51. u32_t conv=get_true_random_number_nz();
  52. while(conv_to_u64.find(conv)!=conv_to_u64.end())
  53. {
  54. conv=get_true_random_number_nz();
  55. }
  56. return conv;
  57. }
  58. int conv_manager_t::is_conv_used(u32_t conv)
  59. {
  60. return conv_to_u64.find(conv)!=conv_to_u64.end();
  61. }
  62. int conv_manager_t::is_u64_used(u64_t u64)
  63. {
  64. return u64_to_conv.find(u64)!=u64_to_conv.end();
  65. }
  66. u32_t conv_manager_t::find_conv_by_u64(u64_t u64)
  67. {
  68. return u64_to_conv[u64];
  69. }
  70. u64_t conv_manager_t::find_u64_by_conv(u32_t conv)
  71. {
  72. return conv_to_u64[conv];
  73. }
  74. int conv_manager_t::update_active_time(u32_t conv)
  75. {
  76. return conv_last_active_time[conv]=get_current_time();
  77. }
  78. int conv_manager_t::insert_conv(u32_t conv,u64_t u64)
  79. {
  80. u64_to_conv[u64]=conv;
  81. conv_to_u64[conv]=u64;
  82. conv_last_active_time[conv]=get_current_time();
  83. return 0;
  84. }
  85. int conv_manager_t::erase_conv(u32_t conv)
  86. {
  87. if(disable_conv_clear) return 0;
  88. u64_t u64=conv_to_u64[conv];
  89. if(program_mode==server_mode)
  90. {
  91. server_clear_function(u64);
  92. }
  93. conv_to_u64.erase(conv);
  94. u64_to_conv.erase(u64);
  95. conv_last_active_time.erase(conv);
  96. return 0;
  97. }
  98. int conv_manager_t::clear_inactive(char * ip_port)
  99. {
  100. if(get_current_time()-last_clear_time>conv_clear_interval)
  101. {
  102. last_clear_time=get_current_time();
  103. return clear_inactive0(ip_port);
  104. }
  105. return 0;
  106. }
  107. int conv_manager_t::clear_inactive0(char * ip_port)
  108. {
  109. if(disable_conv_clear) return 0;
  110. //map<uint32_t,uint64_t>::iterator it;
  111. int cnt=0;
  112. it=clear_it;
  113. int size=conv_last_active_time.size();
  114. int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch
  115. num_to_clean=min(num_to_clean,size);
  116. u64_t current_time=get_current_time();
  117. for(;;)
  118. {
  119. if(cnt>=num_to_clean) break;
  120. if(conv_last_active_time.begin()==conv_last_active_time.end()) break;
  121. if(it==conv_last_active_time.end())
  122. {
  123. it=conv_last_active_time.begin();
  124. }
  125. if( current_time -it->second >conv_timeout )
  126. {
  127. //mylog(log_info,"inactive conv %u cleared \n",it->first);
  128. old_it=it;
  129. it++;
  130. u32_t conv= old_it->first;
  131. erase_conv(old_it->first);
  132. if(ip_port==0)
  133. {
  134. mylog(log_info,"conv %x cleared\n",conv);
  135. }
  136. else
  137. {
  138. mylog(log_info,"[%s]conv %x cleared\n",ip_port,conv);
  139. }
  140. }
  141. else
  142. {
  143. it++;
  144. }
  145. cnt++;
  146. }
  147. return 0;
  148. }
  149. conn_manager_t::conn_manager_t()
  150. {
  151. ready_num=0;
  152. mp.reserve(100007);
  153. fd64_mp.reserve(100007);
  154. clear_it=mp.begin();
  155. last_clear_time=0;
  156. }
  157. int conn_manager_t::exist_ip_port(ip_port_t ip_port)
  158. {
  159. u64_t u64=ip_port.to_u64();
  160. if(mp.find(u64)!=mp.end())
  161. {
  162. return 1;
  163. }
  164. return 0;
  165. }
  166. /*
  167. int insert(uint32_t ip,uint16_t port)
  168. {
  169. uint64_t u64=0;
  170. u64=ip;
  171. u64<<=32u;
  172. u64|=port;
  173. mp[u64];
  174. return 0;
  175. }*/
  176. conn_info_t *& conn_manager_t::find_insert_p(ip_port_t ip_port) //be aware,the adress may change after rehash
  177. {
  178. u64_t u64=ip_port.to_u64();
  179. unordered_map<u64_t,conn_info_t*>::iterator it=mp.find(u64);
  180. if(it==mp.end())
  181. {
  182. mp[u64]=new conn_info_t;
  183. }
  184. return mp[u64];
  185. }
  186. conn_info_t & conn_manager_t::find_insert(ip_port_t ip_port) //be aware,the adress may change after rehash
  187. {
  188. u64_t u64=ip_port.to_u64();
  189. unordered_map<u64_t,conn_info_t*>::iterator it=mp.find(u64);
  190. if(it==mp.end())
  191. {
  192. mp[u64]=new conn_info_t;
  193. }
  194. return *mp[u64];
  195. }
  196. int conn_manager_t::exist_fd64(fd64_t fd64)
  197. {
  198. return fd64_mp.find(fd64)!=fd64_mp.end();
  199. }
  200. void conn_manager_t::insert_fd64(fd64_t fd64,ip_port_t ip_port)
  201. {
  202. assert(exist_ip_port(ip_port));
  203. u64_t u64=ip_port.to_u64();
  204. fd64_mp[fd64]=u64;
  205. }
  206. ip_port_t conn_manager_t::find_by_fd64(fd64_t fd64)
  207. {
  208. assert(exist_fd64(fd64));
  209. ip_port_t res;
  210. res.from_u64(fd64_mp[fd64]);
  211. return res;
  212. }
  213. int conn_manager_t::erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it)
  214. {
  215. /*
  216. if(erase_it->second->state.server_current_state==server_ready)
  217. {
  218. ready_num--;
  219. assert(i32_t(ready_num)!=-1);
  220. assert(erase_it->second!=0);
  221. assert(erase_it->second->timer_fd !=0);
  222. assert(erase_it->second->oppsite_const_id!=0);
  223. assert(const_id_mp.find(erase_it->second->oppsite_const_id)!=const_id_mp.end());
  224. assert(timer_fd_mp.find(erase_it->second->timer_fd)!=timer_fd_mp.end());
  225. const_id_mp.erase(erase_it->second->oppsite_const_id);
  226. timer_fd_mp.erase(erase_it->second->timer_fd);
  227. close(erase_it->second->timer_fd);// close will auto delte it from epoll
  228. delete(erase_it->second);
  229. mp.erase(erase_it->first);
  230. }
  231. else*/
  232. ////////todo close and erase timer_fd ,check fd64 empty
  233. delete(erase_it->second);
  234. mp.erase(erase_it->first);
  235. return 0;
  236. }
  237. int conn_manager_t::clear_inactive()
  238. {
  239. if(get_current_time()-last_clear_time>conn_clear_interval)
  240. {
  241. last_clear_time=get_current_time();
  242. return clear_inactive0();
  243. }
  244. return 0;
  245. }
  246. int conn_manager_t::clear_inactive0()
  247. {
  248. unordered_map<u64_t,conn_info_t*>::iterator it;
  249. unordered_map<u64_t,conn_info_t*>::iterator old_it;
  250. if(disable_conn_clear) return 0;
  251. //map<uint32_t,uint64_t>::iterator it;
  252. int cnt=0;
  253. it=clear_it;
  254. int size=mp.size();
  255. int num_to_clean=size/conn_clear_ratio+conn_clear_min; //clear 1/10 each time,to avoid latency glitch
  256. mylog(log_trace,"mp.size() %d\n", size);
  257. num_to_clean=min(num_to_clean,(int)mp.size());
  258. u64_t current_time=get_current_time();
  259. for(;;)
  260. {
  261. if(cnt>=num_to_clean) break;
  262. if(mp.begin()==mp.end()) break;
  263. if(it==mp.end())
  264. {
  265. it=mp.begin();
  266. }
  267. else if(it->second->conv_manager.get_size() >0)
  268. {
  269. it++;
  270. }
  271. else
  272. {
  273. mylog(log_info,"[%s:%d]inactive conn cleared \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first));
  274. old_it=it;
  275. it++;
  276. erase(old_it);
  277. }
  278. cnt++;
  279. }
  280. return 0;
  281. }
  282. void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection),
  283. //so we have to close the fd when conv expires
  284. {
  285. int fd64=u64;
  286. int ret;
  287. assert(fd_manager.fd64_exist(fd64));
  288. int fd=fd_manager.fd64_to_fd(fd64);
  289. fd_manager.remove_fd64(fd64);
  290. ret= close(fd); //closed fd should be auto removed from epoll
  291. if (ret!=0)
  292. {
  293. mylog(log_fatal,"close fd %d failed !!!!\n",fd);
  294. myexit(-1); //this shouldnt happen
  295. }
  296. //mylog(log_fatal,"size:%d !!!!\n",conn_manager.udp_fd_mp.size());
  297. assert(conn_manager.fd64_mp.find(fd)!=conn_manager.fd64_mp.end());
  298. conn_manager.fd64_mp.erase(fd);
  299. }