connection.cpp 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. /*
  2. * connection.cpp
  3. *
  4. * Created on: Sep 23, 2017
  5. * Author: root
  6. */
  7. #include "connection.h"
  8. int disable_anti_replay=0;//if anti_replay windows is diabled
  9. const int disable_conv_clear=0;//a udp connection in the multiplexer is called conversation in this program,conv for short.
  10. const int disable_conn_clear=0;//a raw connection is called conn.
  11. conn_manager_t conn_manager;
  12. void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection),
  13. //so we have to close the fd when conv expires
  14. {
  15. int fd64=u64;
  16. assert(fd_manager.exist(fd64));
  17. fd_manager.close(fd64);
  18. }
  19. conv_manager_t::conv_manager_t()
  20. {
  21. clear_it=conv_last_active_time.begin();
  22. long long last_clear_time=0;
  23. }
  24. conv_manager_t::~conv_manager_t()
  25. {
  26. clear();
  27. }
  28. int conv_manager_t::get_size()
  29. {
  30. return conv_to_u64.size();
  31. }
  32. void conv_manager_t::reserve()
  33. {
  34. u64_to_conv.reserve(10007);
  35. conv_to_u64.reserve(10007);
  36. conv_last_active_time.reserve(10007);
  37. }
  38. void conv_manager_t::clear()
  39. {
  40. if(disable_conv_clear) return ;
  41. if(program_mode==server_mode)
  42. {
  43. for(it=conv_to_u64.begin();it!=conv_to_u64.end();it++)
  44. {
  45. //int fd=int((it->second<<32u)>>32u);
  46. server_clear_function( it->second);
  47. }
  48. }
  49. u64_to_conv.clear();
  50. conv_to_u64.clear();
  51. conv_last_active_time.clear();
  52. clear_it=conv_last_active_time.begin();
  53. }
  54. u32_t conv_manager_t::get_new_conv()
  55. {
  56. u32_t conv=get_true_random_number_nz();
  57. while(conv_to_u64.find(conv)!=conv_to_u64.end())
  58. {
  59. conv=get_true_random_number_nz();
  60. }
  61. return conv;
  62. }
  63. int conv_manager_t::is_conv_used(u32_t conv)
  64. {
  65. return conv_to_u64.find(conv)!=conv_to_u64.end();
  66. }
  67. int conv_manager_t::is_u64_used(u64_t u64)
  68. {
  69. return u64_to_conv.find(u64)!=u64_to_conv.end();
  70. }
  71. u32_t conv_manager_t::find_conv_by_u64(u64_t u64)
  72. {
  73. return u64_to_conv[u64];
  74. }
  75. u64_t conv_manager_t::find_u64_by_conv(u32_t conv)
  76. {
  77. return conv_to_u64[conv];
  78. }
  79. int conv_manager_t::update_active_time(u32_t conv)
  80. {
  81. return conv_last_active_time[conv]=get_current_time();
  82. }
  83. int conv_manager_t::insert_conv(u32_t conv,u64_t u64)//////todo add capacity
  84. {
  85. int bucket_size_before=conv_last_active_time.bucket_count();
  86. u64_to_conv[u64]=conv;
  87. conv_to_u64[conv]=u64;
  88. conv_last_active_time[conv]=get_current_time();
  89. int bucket_size_after=conv_last_active_time.bucket_count();
  90. if(bucket_size_after!=bucket_size_before)
  91. clear_it=conv_last_active_time.begin();
  92. return 0;
  93. }
  94. int conv_manager_t::erase_conv(u32_t conv)
  95. {
  96. if(disable_conv_clear) return 0;
  97. u64_t u64=conv_to_u64[conv];
  98. if(program_mode==server_mode)
  99. {
  100. server_clear_function(u64);
  101. }
  102. conv_to_u64.erase(conv);
  103. u64_to_conv.erase(u64);
  104. conv_last_active_time.erase(conv);
  105. return 0;
  106. }
  107. int conv_manager_t::clear_inactive(char * ip_port)
  108. {
  109. if(get_current_time()-last_clear_time>conv_clear_interval)
  110. {
  111. last_clear_time=get_current_time();
  112. return clear_inactive0(ip_port);
  113. }
  114. return 0;
  115. }
  116. int conv_manager_t::clear_inactive0(char * ip_port)
  117. {
  118. if(disable_conv_clear) return 0;
  119. //map<uint32_t,uint64_t>::iterator it;
  120. int cnt=0;
  121. it=clear_it;
  122. int size=conv_last_active_time.size();
  123. int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch
  124. num_to_clean=min(num_to_clean,size);
  125. u64_t current_time=get_current_time();
  126. for(;;)
  127. {
  128. if(cnt>=num_to_clean) break;
  129. if(conv_last_active_time.begin()==conv_last_active_time.end()) break;
  130. if(it==conv_last_active_time.end())
  131. {
  132. it=conv_last_active_time.begin();
  133. }
  134. if( current_time -it->second >conv_timeout )
  135. {
  136. //mylog(log_info,"inactive conv %u cleared \n",it->first);
  137. old_it=it;
  138. it++;
  139. u32_t conv= old_it->first;
  140. erase_conv(old_it->first);
  141. if(ip_port==0)
  142. {
  143. mylog(log_info,"conv %x cleared\n",conv);
  144. }
  145. else
  146. {
  147. mylog(log_info,"[%s]conv %x cleared\n",ip_port,conv);
  148. }
  149. }
  150. else
  151. {
  152. it++;
  153. }
  154. cnt++;
  155. }
  156. return 0;
  157. }
  158. conn_manager_t::conn_manager_t()
  159. {
  160. //ready_num=0;
  161. mp.reserve(100007);
  162. //fd64_mp.reserve(100007);
  163. clear_it=mp.begin();
  164. last_clear_time=0;
  165. }
  166. int conn_manager_t::exist(ip_port_t ip_port)
  167. {
  168. u64_t u64=ip_port.to_u64();
  169. if(mp.find(u64)!=mp.end())
  170. {
  171. return 1;
  172. }
  173. return 0;
  174. }
  175. /*
  176. int insert(uint32_t ip,uint16_t port)
  177. {
  178. uint64_t u64=0;
  179. u64=ip;
  180. u64<<=32u;
  181. u64|=port;
  182. mp[u64];
  183. return 0;
  184. }*/
  185. conn_info_t *& conn_manager_t::find_p(ip_port_t ip_port) //todo capacity
  186. //be aware,the adress may change after rehash
  187. {
  188. assert(exist(ip_port));
  189. u64_t u64=ip_port.to_u64();
  190. return mp[u64];
  191. }
  192. conn_info_t & conn_manager_t::find(ip_port_t ip_port) //be aware,the adress may change after rehash
  193. {
  194. assert(exist(ip_port));
  195. u64_t u64=ip_port.to_u64();
  196. return *mp[u64];
  197. }
  198. int conn_manager_t::insert(ip_port_t ip_port)
  199. {
  200. assert(!exist(ip_port));
  201. int bucket_size_before=mp.bucket_count();
  202. mp[ip_port.to_u64()]=new conn_info_t;
  203. int bucket_size_after=mp.bucket_count();
  204. if(bucket_size_after!=bucket_size_before)
  205. clear_it=mp.begin();
  206. return 0;
  207. }
  208. /*
  209. int conn_manager_t::exist_fd64(fd64_t fd64)
  210. {
  211. return fd64_mp.find(fd64)!=fd64_mp.end();
  212. }
  213. void conn_manager_t::insert_fd64(fd64_t fd64,ip_port_t ip_port)
  214. {
  215. assert(exist_ip_port(ip_port));
  216. u64_t u64=ip_port.to_u64();
  217. fd64_mp[fd64]=u64;
  218. }
  219. ip_port_t conn_manager_t::find_by_fd64(fd64_t fd64)
  220. {
  221. assert(exist_fd64(fd64));
  222. ip_port_t res;
  223. res.from_u64(fd64_mp[fd64]);
  224. return res;
  225. }*/
  226. int conn_manager_t::erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it)
  227. {
  228. /*
  229. if(erase_it->second->state.server_current_state==server_ready)
  230. {
  231. ready_num--;
  232. assert(i32_t(ready_num)!=-1);
  233. assert(erase_it->second!=0);
  234. assert(erase_it->second->timer_fd !=0);
  235. assert(erase_it->second->oppsite_const_id!=0);
  236. assert(const_id_mp.find(erase_it->second->oppsite_const_id)!=const_id_mp.end());
  237. assert(timer_fd_mp.find(erase_it->second->timer_fd)!=timer_fd_mp.end());
  238. const_id_mp.erase(erase_it->second->oppsite_const_id);
  239. timer_fd_mp.erase(erase_it->second->timer_fd);
  240. close(erase_it->second->timer_fd);// close will auto delte it from epoll
  241. delete(erase_it->second);
  242. mp.erase(erase_it->first);
  243. }
  244. else*/
  245. ////////todo close and erase timer_fd ,check fd64 empty
  246. delete(erase_it->second);
  247. mp.erase(erase_it->first);
  248. return 0;
  249. }
  250. int conn_manager_t::clear_inactive()
  251. {
  252. if(get_current_time()-last_clear_time>conn_clear_interval)
  253. {
  254. last_clear_time=get_current_time();
  255. return clear_inactive0();
  256. }
  257. return 0;
  258. }
  259. int conn_manager_t::clear_inactive0()
  260. {
  261. unordered_map<u64_t,conn_info_t*>::iterator it;
  262. unordered_map<u64_t,conn_info_t*>::iterator old_it;
  263. if(disable_conn_clear) return 0;
  264. //map<uint32_t,uint64_t>::iterator it;
  265. int cnt=0;
  266. it=clear_it;
  267. int size=mp.size();
  268. int num_to_clean=size/conn_clear_ratio+conn_clear_min; //clear 1/10 each time,to avoid latency glitch
  269. mylog(log_trace,"mp.size() %d\n", size);
  270. num_to_clean=min(num_to_clean,(int)mp.size());
  271. u64_t current_time=get_current_time();
  272. for(;;)
  273. {
  274. if(cnt>=num_to_clean) break;
  275. if(mp.begin()==mp.end()) break;
  276. if(it==mp.end())
  277. {
  278. it=mp.begin();
  279. }
  280. else if(it->second->conv_manager.get_size() >0)
  281. {
  282. it++;
  283. }
  284. else
  285. {
  286. mylog(log_info,"[%s:%d]inactive conn cleared \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first));
  287. old_it=it;
  288. it++;
  289. erase(old_it);
  290. }
  291. cnt++;
  292. }
  293. return 0;
  294. }