connection.cpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. /*
  2. * connection.cpp
  3. *
  4. * Created on: Sep 23, 2017
  5. * Author: root
  6. */
  7. #include "connection.h"
  8. const int disable_conv_clear=0;//a udp connection in the multiplexer is called conversation in this program,conv for short.
  9. const int disable_conn_clear=0;//a raw connection is called conn.
  10. int report_interval=0;
  11. void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection),
  12. //so we have to close the fd when conv expires
  13. {
  14. fd64_t fd64=u64;
  15. assert(fd_manager.exist(fd64));
  16. fd_manager.fd64_close(fd64);
  17. }
  18. ////////////////////////////////////////////////////////////////////
  19. conv_manager_t::conv_manager_t()
  20. {
  21. clear_it=conv_last_active_time.begin();
  22. long long last_clear_time=0;
  23. reserve();
  24. }
  25. conv_manager_t::~conv_manager_t()
  26. {
  27. clear();
  28. }
  29. int conv_manager_t::get_size()
  30. {
  31. return conv_to_u64.size();
  32. }
  33. void conv_manager_t::reserve()
  34. {
  35. u64_to_conv.reserve(10007);
  36. conv_to_u64.reserve(10007);
  37. conv_last_active_time.reserve(10007);
  38. }
  39. void conv_manager_t::clear()
  40. {
  41. //if(disable_conv_clear) return ;//////what was the purpose of this code?
  42. if(client_or_server==server_mode)
  43. {
  44. for(auto it=conv_to_u64.begin();it!=conv_to_u64.end();it++)
  45. {
  46. //int fd=int((it->second<<32u)>>32u);
  47. server_clear_function( it->second);
  48. }
  49. }
  50. u64_to_conv.clear();
  51. conv_to_u64.clear();
  52. conv_last_active_time.clear();
  53. clear_it=conv_last_active_time.begin();
  54. }
  55. u32_t conv_manager_t::get_new_conv()
  56. {
  57. u32_t conv=get_true_random_number_nz();
  58. while(conv_to_u64.find(conv)!=conv_to_u64.end())
  59. {
  60. conv=get_true_random_number_nz();
  61. }
  62. return conv;
  63. }
  64. int conv_manager_t::is_conv_used(u32_t conv)
  65. {
  66. return conv_to_u64.find(conv)!=conv_to_u64.end();
  67. }
  68. int conv_manager_t::is_u64_used(u64_t u64)
  69. {
  70. return u64_to_conv.find(u64)!=u64_to_conv.end();
  71. }
  72. u32_t conv_manager_t::find_conv_by_u64(u64_t u64)
  73. {
  74. assert(is_u64_used(u64));
  75. return u64_to_conv[u64];
  76. }
  77. u64_t conv_manager_t::find_u64_by_conv(u32_t conv)
  78. {
  79. assert(is_conv_used(conv));
  80. return conv_to_u64[conv];
  81. }
  82. int conv_manager_t::update_active_time(u32_t conv)
  83. {
  84. assert(is_conv_used(conv));
  85. return conv_last_active_time[conv]=get_current_time();
  86. }
  87. int conv_manager_t::insert_conv(u32_t conv,u64_t u64)//////todo add capacity ///done at upper level
  88. {
  89. assert(!is_conv_used(conv));
  90. int bucket_size_before=conv_last_active_time.bucket_count();
  91. u64_to_conv[u64]=conv;
  92. conv_to_u64[conv]=u64;
  93. conv_last_active_time[conv]=get_current_time();
  94. int bucket_size_after=conv_last_active_time.bucket_count();
  95. if(bucket_size_after!=bucket_size_before)
  96. clear_it=conv_last_active_time.begin();
  97. return 0;
  98. }
  99. int conv_manager_t::erase_conv(u32_t conv)
  100. {
  101. //if(disable_conv_clear) return 0;
  102. assert(conv_last_active_time.find(conv)!=conv_last_active_time.end());
  103. u64_t u64=conv_to_u64[conv];
  104. if(client_or_server==server_mode)
  105. {
  106. server_clear_function(u64);
  107. }
  108. assert(conv_to_u64.find(conv)!=conv_to_u64.end());
  109. conv_to_u64.erase(conv);
  110. u64_to_conv.erase(u64);
  111. conv_last_active_time.erase(conv);
  112. return 0;
  113. }
  114. int conv_manager_t::clear_inactive(char * ip_port)
  115. {
  116. if(get_current_time()-last_clear_time>conv_clear_interval)
  117. {
  118. last_clear_time=get_current_time();
  119. return clear_inactive0(ip_port);
  120. }
  121. return 0;
  122. }
  123. int conv_manager_t::clear_inactive0(char * ip_port)
  124. {
  125. if(disable_conv_clear) return 0;
  126. //map<uint32_t,uint64_t>::iterator it;
  127. int cnt=0;
  128. auto it=clear_it;
  129. int size=conv_last_active_time.size();
  130. int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch
  131. num_to_clean=min(num_to_clean,size);
  132. u64_t current_time=get_current_time();
  133. for(;;)
  134. {
  135. if(cnt>=num_to_clean) break;
  136. if(conv_last_active_time.begin()==conv_last_active_time.end()) break;
  137. if(it==conv_last_active_time.end())
  138. {
  139. it=conv_last_active_time.begin();
  140. }
  141. if( current_time -it->second >conv_timeout )
  142. {
  143. //mylog(log_info,"inactive conv %u cleared \n",it->first);
  144. //auto old_it=it;
  145. //it++;
  146. u32_t conv= it->first;
  147. it++;
  148. erase_conv(conv);
  149. if(ip_port==0)
  150. {
  151. mylog(log_info,"conv %x cleared\n",conv);
  152. }
  153. else
  154. {
  155. mylog(log_info,"[%s]conv %x cleared\n",ip_port,conv);
  156. }
  157. }
  158. else
  159. {
  160. it++;
  161. }
  162. cnt++;
  163. }
  164. clear_it=it;
  165. return 0;
  166. }
  167. ////////////////////////////////////////////////////////////////////
  168. conn_manager_t::conn_manager_t()
  169. {
  170. //ready_num=0;
  171. mp.reserve(10007);
  172. //fd64_mp.reserve(100007);
  173. clear_it=mp.begin();
  174. last_clear_time=0;
  175. }
  176. int conn_manager_t::exist(ip_port_t ip_port)
  177. {
  178. u64_t u64=ip_port.to_u64();
  179. if(mp.find(u64)!=mp.end())
  180. {
  181. return 1;
  182. }
  183. return 0;
  184. }
  185. conn_info_t *& conn_manager_t::find_p(ip_port_t ip_port) //todo capacity ///done at upper level
  186. //be aware,the adress may change after rehash
  187. {
  188. assert(exist(ip_port));
  189. u64_t u64=ip_port.to_u64();
  190. return mp[u64];
  191. }
  192. conn_info_t & conn_manager_t::find(ip_port_t ip_port) //be aware,the adress may change after rehash
  193. {
  194. assert(exist(ip_port));
  195. u64_t u64=ip_port.to_u64();
  196. return *mp[u64];
  197. }
  198. int conn_manager_t::insert(ip_port_t ip_port)
  199. {
  200. assert(!exist(ip_port));
  201. int bucket_size_before=mp.bucket_count();
  202. mp[ip_port.to_u64()]=new conn_info_t;
  203. int bucket_size_after=mp.bucket_count();
  204. if(bucket_size_after!=bucket_size_before)
  205. clear_it=mp.begin();
  206. return 0;
  207. }
  208. int conn_manager_t::erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it)
  209. {
  210. ////////todo close and erase timer_fd ,check fd64 empty ///dont need
  211. delete(erase_it->second);
  212. mp.erase(erase_it->first);
  213. return 0;
  214. }
  215. int conn_manager_t::clear_inactive()
  216. {
  217. if(get_current_time()-last_clear_time>conn_clear_interval)
  218. {
  219. last_clear_time=get_current_time();
  220. return clear_inactive0();
  221. }
  222. return 0;
  223. }
  224. int conn_manager_t::clear_inactive0()
  225. {
  226. //mylog(log_info,"called\n");
  227. unordered_map<u64_t,conn_info_t*>::iterator it;
  228. unordered_map<u64_t,conn_info_t*>::iterator old_it;
  229. if(disable_conn_clear) return 0;
  230. //map<uint32_t,uint64_t>::iterator it;
  231. int cnt=0;
  232. it=clear_it;//TODO,write it back
  233. int size=mp.size();
  234. int num_to_clean=size/conn_clear_ratio+conn_clear_min; //clear 1/10 each time,to avoid latency glitch
  235. //mylog(log_trace,"mp.size() %d\n", size);
  236. num_to_clean=min(num_to_clean,(int)mp.size());
  237. u64_t current_time=get_current_time();
  238. //mylog(log_info,"here size=%d\n",(int)mp.size());
  239. for(;;)
  240. {
  241. if(cnt>=num_to_clean) break;
  242. if(mp.begin()==mp.end()) break;
  243. if(it==mp.end())
  244. {
  245. it=mp.begin();
  246. }
  247. if(it->second->conv_manager.get_size() >0)
  248. {
  249. //mylog(log_info,"[%s:%d]size %d \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first),(int)it->second->conv_manager.get_size());
  250. it++;
  251. }
  252. else if(current_time<it->second->last_active_time+server_conn_timeout)
  253. {
  254. it++;
  255. }
  256. else
  257. {
  258. mylog(log_info,"[%s:%d]inactive conn cleared \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first));
  259. old_it=it;
  260. it++;
  261. erase(old_it);
  262. }
  263. cnt++;
  264. }
  265. clear_it=it;
  266. return 0;
  267. }