connection.cpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. /*
  2. * connection.cpp
  3. *
  4. * Created on: Sep 23, 2017
  5. * Author: root
  6. */
  7. #include "connection.h"
  8. const int disable_conv_clear=0;//a udp connection in the multiplexer is called conversation in this program,conv for short.
  9. const int disable_conn_clear=0;//a raw connection is called conn.
  10. int report_interval=0;
  11. void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection),
  12. //so we have to close the fd when conv expires
  13. {
  14. fd64_t fd64=u64;
  15. assert(fd_manager.exist(fd64));
  16. ev_io &watcher= fd_manager.get_info(fd64).io_watcher;
  17. ip_port_t &ip_port=fd_manager.get_info(fd64).ip_port;//
  18. assert(conn_manager.exist(ip_port));//
  19. ev_loop *loop =conn_manager.find(ip_port).loop; // overkill ? should we just use ev_default_loop(0)?
  20. ev_io_stop(loop,&watcher);
  21. fd_manager.fd64_close(fd64);
  22. }
  23. ////////////////////////////////////////////////////////////////////
  24. conv_manager_t::conv_manager_t()
  25. {
  26. clear_it=conv_last_active_time.begin();
  27. long long last_clear_time=0;
  28. reserve();
  29. }
  30. conv_manager_t::~conv_manager_t()
  31. {
  32. clear();
  33. }
  34. int conv_manager_t::get_size()
  35. {
  36. return conv_to_u64.size();
  37. }
  38. void conv_manager_t::reserve()
  39. {
  40. u64_to_conv.reserve(10007);
  41. conv_to_u64.reserve(10007);
  42. conv_last_active_time.reserve(10007);
  43. }
  44. void conv_manager_t::clear()
  45. {
  46. //if(disable_conv_clear) return ;//////what was the purpose of this code?
  47. if(client_or_server==server_mode)
  48. {
  49. for(auto it=conv_to_u64.begin();it!=conv_to_u64.end();it++)
  50. {
  51. //int fd=int((it->second<<32u)>>32u);
  52. server_clear_function( it->second);
  53. }
  54. }
  55. u64_to_conv.clear();
  56. conv_to_u64.clear();
  57. conv_last_active_time.clear();
  58. clear_it=conv_last_active_time.begin();
  59. }
  60. u32_t conv_manager_t::get_new_conv()
  61. {
  62. u32_t conv=get_fake_random_number_nz();
  63. while(conv_to_u64.find(conv)!=conv_to_u64.end())
  64. {
  65. conv=get_fake_random_number_nz();
  66. }
  67. return conv;
  68. }
  69. int conv_manager_t::is_conv_used(u32_t conv)
  70. {
  71. return conv_to_u64.find(conv)!=conv_to_u64.end();
  72. }
  73. int conv_manager_t::is_u64_used(u64_t u64)
  74. {
  75. return u64_to_conv.find(u64)!=u64_to_conv.end();
  76. }
  77. u32_t conv_manager_t::find_conv_by_u64(u64_t u64)
  78. {
  79. assert(is_u64_used(u64));
  80. return u64_to_conv[u64];
  81. }
  82. u64_t conv_manager_t::find_u64_by_conv(u32_t conv)
  83. {
  84. assert(is_conv_used(conv));
  85. return conv_to_u64[conv];
  86. }
  87. int conv_manager_t::update_active_time(u32_t conv)
  88. {
  89. assert(is_conv_used(conv));
  90. return conv_last_active_time[conv]=get_current_time();
  91. }
  92. int conv_manager_t::insert_conv(u32_t conv,u64_t u64)//////todo add capacity ///done at upper level
  93. {
  94. assert(!is_conv_used(conv));
  95. int bucket_size_before=conv_last_active_time.bucket_count();
  96. u64_to_conv[u64]=conv;
  97. conv_to_u64[conv]=u64;
  98. conv_last_active_time[conv]=get_current_time();
  99. int bucket_size_after=conv_last_active_time.bucket_count();
  100. if(bucket_size_after!=bucket_size_before)
  101. clear_it=conv_last_active_time.begin();
  102. return 0;
  103. }
  104. int conv_manager_t::erase_conv(u32_t conv)
  105. {
  106. //if(disable_conv_clear) return 0;
  107. assert(conv_last_active_time.find(conv)!=conv_last_active_time.end());
  108. u64_t u64=conv_to_u64[conv];
  109. if(client_or_server==server_mode)
  110. {
  111. server_clear_function(u64);
  112. }
  113. assert(conv_to_u64.find(conv)!=conv_to_u64.end());
  114. conv_to_u64.erase(conv);
  115. u64_to_conv.erase(u64);
  116. conv_last_active_time.erase(conv);
  117. return 0;
  118. }
  119. int conv_manager_t::clear_inactive(char * ip_port)
  120. {
  121. if(get_current_time()-last_clear_time>conv_clear_interval)
  122. {
  123. last_clear_time=get_current_time();
  124. return clear_inactive0(ip_port);
  125. }
  126. return 0;
  127. }
  128. int conv_manager_t::clear_inactive0(char * ip_port)
  129. {
  130. if(disable_conv_clear) return 0;
  131. //map<uint32_t,uint64_t>::iterator it;
  132. int cnt=0;
  133. auto it=clear_it;
  134. int size=conv_last_active_time.size();
  135. int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch
  136. num_to_clean=min(num_to_clean,size);
  137. u64_t current_time=get_current_time();
  138. for(;;)
  139. {
  140. if(cnt>=num_to_clean) break;
  141. if(conv_last_active_time.begin()==conv_last_active_time.end()) break;
  142. if(it==conv_last_active_time.end())
  143. {
  144. it=conv_last_active_time.begin();
  145. }
  146. if( current_time -it->second >conv_timeout )
  147. {
  148. //mylog(log_info,"inactive conv %u cleared \n",it->first);
  149. //auto old_it=it;
  150. //it++;
  151. u32_t conv= it->first;
  152. it++;
  153. erase_conv(conv);
  154. if(ip_port==0)
  155. {
  156. mylog(log_info,"conv %x cleared\n",conv);
  157. }
  158. else
  159. {
  160. mylog(log_info,"[%s]conv %x cleared\n",ip_port,conv);
  161. }
  162. }
  163. else
  164. {
  165. it++;
  166. }
  167. cnt++;
  168. }
  169. clear_it=it;
  170. return 0;
  171. }
  172. ////////////////////////////////////////////////////////////////////
  173. conn_manager_t::conn_manager_t()
  174. {
  175. //ready_num=0;
  176. mp.reserve(10007);
  177. //fd64_mp.reserve(100007);
  178. clear_it=mp.begin();
  179. last_clear_time=0;
  180. }
  181. int conn_manager_t::exist(ip_port_t ip_port)
  182. {
  183. u64_t u64=ip_port.to_u64();
  184. if(mp.find(u64)!=mp.end())
  185. {
  186. return 1;
  187. }
  188. return 0;
  189. }
  190. conn_info_t *& conn_manager_t::find_p(ip_port_t ip_port) //todo capacity ///done at upper level
  191. //be aware,the adress may change after rehash
  192. {
  193. assert(exist(ip_port));
  194. u64_t u64=ip_port.to_u64();
  195. return mp[u64];
  196. }
  197. conn_info_t & conn_manager_t::find(ip_port_t ip_port) //be aware,the adress may change after rehash
  198. {
  199. assert(exist(ip_port));
  200. u64_t u64=ip_port.to_u64();
  201. return *mp[u64];
  202. }
  203. int conn_manager_t::insert(ip_port_t ip_port)
  204. {
  205. assert(!exist(ip_port));
  206. int bucket_size_before=mp.bucket_count();
  207. mp[ip_port.to_u64()]=new conn_info_t;
  208. int bucket_size_after=mp.bucket_count();
  209. if(bucket_size_after!=bucket_size_before)
  210. clear_it=mp.begin();
  211. return 0;
  212. }
  213. int conn_manager_t::erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it)
  214. {
  215. ////////todo close and erase timer_fd ,check fd64 empty ///dont need
  216. delete(erase_it->second);
  217. mp.erase(erase_it->first);
  218. return 0;
  219. }
  220. int conn_manager_t::clear_inactive()
  221. {
  222. if(get_current_time()-last_clear_time>conn_clear_interval)
  223. {
  224. last_clear_time=get_current_time();
  225. return clear_inactive0();
  226. }
  227. return 0;
  228. }
  229. int conn_manager_t::clear_inactive0()
  230. {
  231. //mylog(log_info,"called\n");
  232. unordered_map<u64_t,conn_info_t*>::iterator it;
  233. unordered_map<u64_t,conn_info_t*>::iterator old_it;
  234. if(disable_conn_clear) return 0;
  235. //map<uint32_t,uint64_t>::iterator it;
  236. int cnt=0;
  237. it=clear_it;//TODO,write it back
  238. int size=mp.size();
  239. int num_to_clean=size/conn_clear_ratio+conn_clear_min; //clear 1/10 each time,to avoid latency glitch
  240. //mylog(log_trace,"mp.size() %d\n", size);
  241. num_to_clean=min(num_to_clean,(int)mp.size());
  242. u64_t current_time=get_current_time();
  243. //mylog(log_info,"here size=%d\n",(int)mp.size());
  244. for(;;)
  245. {
  246. if(cnt>=num_to_clean) break;
  247. if(mp.begin()==mp.end()) break;
  248. if(it==mp.end())
  249. {
  250. it=mp.begin();
  251. }
  252. if(it->second->conv_manager.get_size() >0)
  253. {
  254. //mylog(log_info,"[%s:%d]size %d \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first),(int)it->second->conv_manager.get_size());
  255. it++;
  256. }
  257. else if(current_time<it->second->last_active_time+server_conn_timeout)
  258. {
  259. it++;
  260. }
  261. else
  262. {
  263. mylog(log_info,"[%s:%d]inactive conn cleared \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first));
  264. old_it=it;
  265. it++;
  266. erase(old_it);
  267. }
  268. cnt++;
  269. }
  270. clear_it=it;
  271. return 0;
  272. }