main.cpp 29 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307
  1. #include "common.h"
  2. #include "log.h"
  3. using namespace std;
  4. typedef unsigned long long u64_t; //this works on most platform,avoid using the PRId64
  5. typedef long long i64_t;
  6. typedef unsigned int u32_t;
  7. typedef int i32_t;
  8. //const u32_t anti_replay_window_size=1000;
  9. typedef u64_t anti_replay_seq_t;
  10. int disable_replay_filter=0;
  11. int dup_num=3;
  12. int dup_delay=900; //ms
  13. int iv_min=2;
  14. int iv_max=30;//< 256;
  15. int random_number_fd=-1;
  16. int remote_fd=-1;
  17. int local_fd=-1;
  18. int is_client = 0, is_server = 0;
  19. int local_listen_fd=-1;
  20. int disable_conv_clear=0;
  21. u32_t remote_address_uint32=0;
  22. char local_address[100], remote_address[100];
  23. int local_port = -1, remote_port = -1;
  24. int multi_process_mode=0;
  25. const u32_t anti_replay_buff_size=10000;
  26. char key_string[1000]= "secret key";
  27. int VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV;
  28. struct anti_replay_t
  29. {
  30. u64_t max_packet_received;
  31. u64_t replay_buffer[anti_replay_buff_size];
  32. unordered_set<u64_t> st;
  33. u32_t const_id;
  34. u32_t anti_replay_seq;
  35. int index;
  36. anti_replay_seq_t get_new_seq_for_send()
  37. {
  38. anti_replay_seq_t res=const_id;
  39. res<<=32u;
  40. anti_replay_seq++;
  41. res|=anti_replay_seq;
  42. memset(replay_buffer,0,sizeof(replay_buffer));
  43. st.rehash(anti_replay_buff_size*10);
  44. return res;
  45. }
  46. void prepare()
  47. {
  48. anti_replay_seq=get_true_random_number();//random first seq
  49. const_id=get_true_random_number_nz();
  50. }
  51. anti_replay_t()
  52. {
  53. max_packet_received=0;
  54. index=0;
  55. //memset(window,0,sizeof(window)); //not necessary
  56. }
  57. int is_vaild(u64_t seq)
  58. {
  59. if(disable_replay_filter) return 1;
  60. if(seq==0)
  61. {
  62. mylog(log_debug,"seq=0\n");
  63. return 0;
  64. }
  65. if(st.find(seq)!=st.end() )
  66. {
  67. mylog(log_trace,"seq %llx exist\n",seq);
  68. return 0;
  69. }
  70. if(replay_buffer[index]!=0)
  71. {
  72. assert(st.find(replay_buffer[index])!=st.end());
  73. st.erase(replay_buffer[index]);
  74. }
  75. replay_buffer[index]=seq;
  76. st.insert(seq);
  77. index++;
  78. if(index==int(anti_replay_buff_size)) index=0;
  79. return 1; //for complier check
  80. }
  81. }anti_replay;
  82. struct conn_manager_t //TODO change map to unordered map
  83. {
  84. //typedef hash_map map;
  85. unordered_map<u64_t,u32_t> u64_to_fd; //conv and u64 are both supposed to be uniq
  86. unordered_map<u32_t,u64_t> fd_to_u64;
  87. unordered_map<u32_t,u64_t> fd_last_active_time;
  88. unordered_map<u32_t,u64_t>::iterator clear_it;
  89. unordered_map<u32_t,u64_t>::iterator it;
  90. unordered_map<u32_t,u64_t>::iterator old_it;
  91. //void (*clear_function)(uint64_t u64) ;
  92. long long last_clear_time;
  93. conn_manager_t()
  94. {
  95. clear_it=fd_last_active_time.begin();
  96. long long last_clear_time=0;
  97. //clear_function=0;
  98. }
  99. ~conn_manager_t()
  100. {
  101. clear();
  102. }
  103. int get_size()
  104. {
  105. return fd_to_u64.size();
  106. }
  107. void reserve()
  108. {
  109. u64_to_fd.reserve(10007);
  110. fd_to_u64.reserve(10007);
  111. fd_last_active_time.reserve(10007);
  112. }
  113. void clear()
  114. {
  115. if(disable_conv_clear) return ;
  116. for(it=fd_to_u64.begin();it!=fd_to_u64.end();it++)
  117. {
  118. //int fd=int((it->second<<32u)>>32u);
  119. close( it->first);
  120. }
  121. u64_to_fd.clear();
  122. fd_to_u64.clear();
  123. fd_last_active_time.clear();
  124. clear_it=fd_last_active_time.begin();
  125. }
  126. int exist_fd(u32_t fd)
  127. {
  128. return fd_to_u64.find(fd)!=fd_to_u64.end();
  129. }
  130. int exist_u64(u64_t u64)
  131. {
  132. return u64_to_fd.find(u64)!=u64_to_fd.end();
  133. }
  134. u32_t find_fd_by_u64(u64_t u64)
  135. {
  136. return u64_to_fd[u64];
  137. }
  138. u64_t find_u64_by_fd(u32_t fd)
  139. {
  140. return fd_to_u64[fd];
  141. }
  142. int update_active_time(u32_t fd)
  143. {
  144. return fd_last_active_time[fd]=get_current_time();
  145. }
  146. int insert_fd(u32_t fd,u64_t u64)
  147. {
  148. u64_to_fd[u64]=fd;
  149. fd_to_u64[fd]=u64;
  150. fd_last_active_time[fd]=get_current_time();
  151. return 0;
  152. }
  153. int erase_fd(u32_t fd)
  154. {
  155. if(disable_conv_clear) return 0;
  156. u64_t u64=fd_to_u64[fd];
  157. close(fd);
  158. fd_to_u64.erase(fd);
  159. u64_to_fd.erase(u64);
  160. fd_last_active_time.erase(fd);
  161. return 0;
  162. }
  163. int clear_inactive(char * ip_port=0)
  164. {
  165. if(get_current_time()-last_clear_time>conv_clear_interval)
  166. {
  167. last_clear_time=get_current_time();
  168. return clear_inactive0(ip_port);
  169. }
  170. return 0;
  171. }
  172. int clear_inactive0(char * ip_port)
  173. {
  174. if(disable_conv_clear) return 0;
  175. //map<uint32_t,uint64_t>::iterator it;
  176. int cnt=0;
  177. it=clear_it;
  178. int size=fd_last_active_time.size();
  179. int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch
  180. u64_t current_time=get_current_time();
  181. for(;;)
  182. {
  183. if(cnt>=num_to_clean) break;
  184. if(fd_last_active_time.begin()==fd_last_active_time.end()) break;
  185. if(it==fd_last_active_time.end())
  186. {
  187. it=fd_last_active_time.begin();
  188. }
  189. if( current_time -it->second >conv_timeout )
  190. {
  191. //mylog(log_info,"inactive conv %u cleared \n",it->first);
  192. old_it=it;
  193. it++;
  194. u32_t fd= old_it->first;
  195. erase_fd(old_it->first);
  196. if(ip_port==0)
  197. {
  198. mylog(log_info,"fd %x cleared\n",fd);
  199. }
  200. else
  201. {
  202. mylog(log_info,"[%s]fd %x cleared\n",ip_port,fd);
  203. }
  204. }
  205. else
  206. {
  207. it++;
  208. }
  209. cnt++;
  210. }
  211. return 0;
  212. }
  213. }conn_manager;
  214. typedef u64_t my_time;
  215. struct delay_data
  216. {
  217. int fd;
  218. int times_left;
  219. char * data;
  220. int len;
  221. };
  222. int delay_timer_fd;
  223. multimap<my_time,delay_data> delay_mp;
  224. my_time time_after_delay(my_time time)
  225. {
  226. time+=dup_delay*1000;
  227. return time;
  228. }
  229. int add_to_delay_mp(int fd,int times_left,char * buf,int len)
  230. {
  231. delay_data tmp;
  232. tmp.data = buf;
  233. tmp.fd = fd;
  234. tmp.times_left = times_left;
  235. tmp.len = len;
  236. my_time tmp_time=get_current_time_us();
  237. //clock_gettime(CLOCK_MONOTONIC, &tmp_time);
  238. tmp_time=time_after_delay(tmp_time);
  239. delay_mp.insert(make_pair(tmp_time,tmp));
  240. return 0;
  241. }
  242. int add_and_new(int fd,int times_left,char * buf,int len)
  243. {
  244. char * str= (char *)malloc(len);
  245. memcpy(str,buf,len);
  246. add_to_delay_mp(fd,times_left,str,len);
  247. return 0;
  248. }
  249. multimap<u64_t,delay_data> new_delay_mp;
  250. void handler(int num) {
  251. int status;
  252. int pid;
  253. while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
  254. if (WIFEXITED(status)) {
  255. //printf("The child exit with code %d",WEXITSTATUS(status));
  256. }
  257. }
  258. }
  259. void encrypt_0(char * input,int &len,char *key)
  260. {
  261. int i,j;
  262. if(key[0]==0) return;
  263. for(i=0,j=0;i<len;i++,j++)
  264. {
  265. if(key[j]==0)j=0;
  266. input[i]^=key[j];
  267. }
  268. }
  269. void decrypt_0(char * input,int &len,char *key)
  270. {
  271. int i,j;
  272. if(key[0]==0) return;
  273. for(i=0,j=0;i<len;i++,j++)
  274. {
  275. if(key[j]==0)j=0;
  276. input[i]^=key[j];
  277. }
  278. }
  279. int add_seq(char * data,int &data_len )
  280. {
  281. if(data_len<0) return -1;
  282. anti_replay_seq_t seq=anti_replay.get_new_seq_for_send();
  283. seq=hton64(seq);
  284. memcpy(data+data_len,&seq,sizeof(seq));
  285. data_len+=sizeof(seq);
  286. return 0;
  287. }
  288. int remove_seq(char * data,int &data_len)
  289. {
  290. anti_replay_seq_t seq;
  291. if(data_len<int(sizeof(seq))) return -1;
  292. data_len-=sizeof(seq);
  293. memcpy(&seq,data+data_len,sizeof(seq));
  294. seq=ntoh64(seq);
  295. if(anti_replay.is_vaild(seq)==0)
  296. {
  297. mylog(log_trace,"seq %llx dropped bc of replay-filter\n ",seq);
  298. return -1; //TODO for test
  299. }
  300. return 0;
  301. }
  302. int do_obscure(const char * input, int in_len,char *output,int &out_len)
  303. {
  304. //memcpy(output,input,in_len);
  305. // out_len=in_len;
  306. //return 0;
  307. int i, j, k;
  308. if (in_len > 65535||in_len<0)
  309. return -1;
  310. int iv_len=iv_min+rand()%(iv_max-iv_min);
  311. get_true_random_chars(output,iv_len);
  312. memcpy(output+iv_len,input,in_len);
  313. output[iv_len+in_len]=(uint8_t)iv_len;
  314. output[iv_len+in_len]^=output[0];
  315. output[iv_len+in_len]^=key_string[0];
  316. for(i=0,j=0,k=1;i<in_len;i++,j++,k++)
  317. {
  318. if(j==iv_len) j=0;
  319. if(key_string[k]==0)k=0;
  320. output[iv_len+i]^=output[j];
  321. output[iv_len+i]^=key_string[k];
  322. }
  323. out_len=iv_len+in_len+1;
  324. return 0;
  325. }
  326. int de_obscure(const char * input, int in_len,char *output,int &out_len)
  327. {
  328. //memcpy(output,input,in_len);
  329. //out_len=in_len;
  330. //return 0;
  331. int i, j, k;
  332. if (in_len > 65535||in_len<0)
  333. {
  334. mylog(log_debug,"in_len > 65535||in_len<0 , %d",in_len);
  335. return -1;
  336. }
  337. int iv_len= int ((uint8_t)(input[in_len-1]^input[0]^key_string[0]) );
  338. out_len=in_len-1-iv_len;
  339. if(out_len<0)
  340. {
  341. mylog(log_debug,"%d %d\n",in_len,out_len);
  342. return -1;
  343. }
  344. for(i=0,j=0,k=1;i<in_len;i++,j++,k++)
  345. {
  346. if(j==iv_len) j=0;
  347. if(key_string[k]==0)k=0;
  348. output[i]=input[iv_len+i]^input[j]^key_string[k];
  349. }
  350. return 0;
  351. }
  352. void check_delay_map()
  353. {
  354. //printf("<<<begin");
  355. if(!delay_mp.empty())
  356. {
  357. my_time current_time;
  358. multimap<my_time,delay_data>::iterator it;
  359. //printf("<map_size:%d>",delay_mp.size());
  360. //lfflush(stdout);
  361. while(1)
  362. {
  363. int ret=0;
  364. it=delay_mp.begin();
  365. if(it==delay_mp.end()) break;
  366. current_time=get_current_time_us();
  367. if(it->first < current_time||it->first ==current_time)
  368. {
  369. //send packet
  370. //printf("<%d>",it->second.len);
  371. if(multi_process_mode)
  372. {
  373. if ((is_client && it->second.fd == remote_fd)
  374. || (is_server && it->second.fd == local_fd)) {
  375. char new_data[buf_len];
  376. int new_len=0;
  377. do_obscure(it->second.data, it->second.len, new_data,
  378. new_len);
  379. ret = send(it->second.fd, new_data, new_len, 0);
  380. } else {
  381. ret = send(it->second.fd, it->second.data,
  382. it->second.len, 0);
  383. }
  384. if (ret < 0) {
  385. mylog(log_fatal,"send return %d at @300", ret);
  386. myexit(1);
  387. }
  388. }
  389. else
  390. {
  391. if(is_client)
  392. {
  393. char new_data[buf_len];
  394. int new_len=0;
  395. do_obscure(it->second.data, it->second.len, new_data,
  396. new_len);
  397. ret = send(it->second.fd, new_data, new_len, 0);
  398. }
  399. else
  400. {
  401. if(conn_manager.exist_fd(it->second.fd))
  402. {
  403. u64_t u64=conn_manager.find_u64_by_fd(it->second.fd);
  404. sockaddr_in tmp_sockaddr;
  405. memset(&tmp_sockaddr,0,sizeof(tmp_sockaddr));
  406. tmp_sockaddr.sin_family = AF_INET;
  407. tmp_sockaddr.sin_addr.s_addr = (u64 >> 32u);
  408. tmp_sockaddr.sin_port = htons(uint16_t((u64 << 32u) >> 32u));
  409. char new_data[buf_len];
  410. int new_len=0;
  411. do_obscure(it->second.data, it->second.len, new_data,
  412. new_len);
  413. ret = sendto(local_listen_fd, new_data,
  414. new_len , 0,
  415. (struct sockaddr *) &tmp_sockaddr,
  416. sizeof(tmp_sockaddr));
  417. //ret = send(it->second.fd, it->second.data,
  418. // it->second.len, 0);
  419. }
  420. else
  421. {
  422. it->second.times_left=0;
  423. }
  424. }
  425. if (ret < 0) {
  426. mylog(log_debug,"send return %d at @300", ret);
  427. }
  428. }
  429. if(it->second.times_left>1)
  430. {
  431. //delay_mp.insert(pair<my_time,delay_data>(current_time));
  432. add_to_delay_mp(it->second.fd,it->second.times_left-1,it->second.data,it->second.len);
  433. }
  434. else
  435. {
  436. free(it->second.data);
  437. }
  438. delay_mp.erase(it);
  439. }
  440. else
  441. {
  442. break;
  443. }
  444. }
  445. if(!delay_mp.empty())
  446. {
  447. itimerspec its;
  448. memset(&its.it_interval,0,sizeof(its.it_interval));
  449. its.it_value.tv_sec=delay_mp.begin()->first/1000000llu;
  450. its.it_value.tv_nsec=(delay_mp.begin()->first%1000000llu)*1000llu;
  451. timerfd_settime(delay_timer_fd,TFD_TIMER_ABSTIME,&its,0);
  452. }
  453. }
  454. //printf("end");
  455. }
  456. int create_new_udp(int &new_udp_fd)
  457. {
  458. struct sockaddr_in remote_addr_in;
  459. socklen_t slen = sizeof(sockaddr_in);
  460. memset(&remote_addr_in, 0, sizeof(remote_addr_in));
  461. remote_addr_in.sin_family = AF_INET;
  462. remote_addr_in.sin_port = htons(remote_port);
  463. remote_addr_in.sin_addr.s_addr = remote_address_uint32;
  464. new_udp_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  465. if (new_udp_fd < 0) {
  466. mylog(log_warn, "create udp_fd error\n");
  467. return -1;
  468. }
  469. setnonblocking(new_udp_fd);
  470. set_buf_size(new_udp_fd);
  471. mylog(log_debug, "created new udp_fd %d\n", new_udp_fd);
  472. int ret = connect(new_udp_fd, (struct sockaddr *) &remote_addr_in, slen);
  473. if (ret != 0) {
  474. mylog(log_warn, "udp fd connect fail\n");
  475. close(new_udp_fd);
  476. return -1;
  477. }
  478. return 0;
  479. }
  480. int set_timer(int epollfd,int &timer_fd)
  481. {
  482. int ret;
  483. epoll_event ev;
  484. itimerspec its;
  485. memset(&its,0,sizeof(its));
  486. if((timer_fd=timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK)) < 0)
  487. {
  488. mylog(log_fatal,"timer_fd create error\n");
  489. myexit(1);
  490. }
  491. its.it_interval.tv_sec=(timer_interval/1000);
  492. its.it_interval.tv_nsec=(timer_interval%1000)*1000ll*1000ll;
  493. its.it_value.tv_nsec=1; //imidiately
  494. timerfd_settime(timer_fd,0,&its,0);
  495. ev.events = EPOLLIN;
  496. ev.data.u64 = timer_fd;
  497. ret=epoll_ctl(epollfd, EPOLL_CTL_ADD, timer_fd, &ev);
  498. if (ret < 0) {
  499. mylog(log_fatal,"epoll_ctl return %d\n", ret);
  500. myexit(-1);
  501. }
  502. return 0;
  503. }
  504. int event_loop()
  505. {
  506. struct sockaddr_in local_me, local_other;
  507. local_listen_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  508. int yes = 1;
  509. //setsockopt(local_listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
  510. set_buf_size(local_listen_fd);
  511. setnonblocking(local_listen_fd);
  512. //char data[buf_len];
  513. //char *data=data0;
  514. socklen_t slen = sizeof(sockaddr_in);
  515. memset(&local_me, 0, sizeof(local_me));
  516. local_me.sin_family = AF_INET;
  517. local_me.sin_port = htons(local_port);
  518. local_me.sin_addr.s_addr = inet_addr(local_address);
  519. if (bind(local_listen_fd, (struct sockaddr*) &local_me, slen) == -1)
  520. {
  521. mylog(log_fatal,"socket bind error");
  522. myexit(1);
  523. }
  524. int epollfd = epoll_create1(0);
  525. const int max_events = 4096;
  526. struct epoll_event ev, events[max_events];
  527. if (epollfd < 0)
  528. {
  529. mylog(log_fatal,"epoll created return %d\n", epollfd);
  530. myexit(-1);
  531. }
  532. ev.events = EPOLLIN;
  533. ev.data.fd = local_listen_fd;
  534. int ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, local_listen_fd, &ev);
  535. if(ret!=0)
  536. {
  537. mylog(log_fatal,"epoll created return %d\n", epollfd);
  538. myexit(-1);
  539. }
  540. int clear_timer_fd=-1;
  541. set_timer(epollfd,clear_timer_fd);
  542. if ((delay_timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
  543. {
  544. mylog(log_fatal,"timer_fd create error");
  545. myexit(1);
  546. }
  547. ev.events = EPOLLIN;
  548. ev.data.fd = delay_timer_fd;
  549. itimerspec zero_its;
  550. memset(&zero_its, 0, sizeof(zero_its));
  551. timerfd_settime(delay_timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
  552. epoll_ctl(epollfd, EPOLL_CTL_ADD, delay_timer_fd, &ev);
  553. if (ret < 0)
  554. {
  555. mylog(log_fatal,"epoll_ctl return %d\n", ret);
  556. myexit(-1);
  557. }
  558. for (;;)
  559. {
  560. int nfds = epoll_wait(epollfd, events, max_events, 180 * 1000); //3mins
  561. if (nfds < 0)
  562. {
  563. mylog(log_fatal,"epoll_wait return %d\n", nfds);
  564. myexit(-1);
  565. }
  566. int n;
  567. int clear_triggered=0;
  568. for (n = 0; n < nfds; ++n)
  569. {
  570. if (events[n].data.fd == local_listen_fd) //data income from local end
  571. {
  572. char data[buf_len];
  573. int data_len;
  574. if ((data_len = recvfrom(local_listen_fd, data, buf_len, 0,
  575. (struct sockaddr *) &local_other, &slen)) == -1) //<--first packet from a new ip:port turple
  576. {
  577. mylog(log_error,"recv_from error,this shouldnt happen,but we can try to continue");
  578. //myexit(1);
  579. continue;
  580. }
  581. data[data_len] = 0; //for easier debug
  582. u64_t u64=pack_u64(local_other.sin_addr.s_addr,ntohs(local_other.sin_port));
  583. if(!conn_manager.exist_u64(u64))
  584. {
  585. int new_udp_fd;
  586. if(create_new_udp(new_udp_fd)!=0)
  587. {
  588. continue;
  589. }
  590. struct epoll_event ev;
  591. mylog(log_trace, "u64: %lld\n", u64);
  592. ev.events = EPOLLIN;
  593. ev.data.fd = new_udp_fd;
  594. ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, new_udp_fd, &ev);
  595. if (ret != 0) {
  596. mylog(log_warn, "add udp_fd error \n");
  597. //perror("why?");
  598. close(new_udp_fd);
  599. continue;
  600. }
  601. mylog(log_info,"created new udp\n");
  602. conn_manager.insert_fd(new_udp_fd,u64);
  603. }
  604. int new_udp_fd=conn_manager.find_fd_by_u64(u64);
  605. conn_manager.update_active_time(new_udp_fd);
  606. int ret;
  607. if(is_client)
  608. {
  609. add_seq(data,data_len);
  610. char new_data[buf_len];
  611. int new_len=0;
  612. do_obscure(data, data_len, new_data, new_len);
  613. ret = send(new_udp_fd, new_data,new_len, 0);
  614. if(dup_num>1)
  615. {
  616. add_and_new(new_udp_fd, dup_num - 1, data, data_len);
  617. }
  618. }
  619. else
  620. {
  621. char new_data[buf_len];
  622. int new_len;
  623. if (de_obscure(data, data_len, new_data, new_len) != 0) {
  624. mylog(log_trace,"de_obscure failed \n");
  625. continue;
  626. }
  627. if (remove_seq(new_data, new_len) != 0) {
  628. mylog(log_trace,"remove_seq failed \n");
  629. continue;
  630. }
  631. ret = send(new_udp_fd, new_data,new_len, 0);
  632. }
  633. if (ret < 0) {
  634. mylog(log_warn, "send returned %d\n", ret);
  635. //perror("what happened????");
  636. }
  637. }
  638. else if(events[n].data.fd == clear_timer_fd)
  639. {
  640. clear_triggered=1;
  641. }
  642. else if (events[n].data.fd == delay_timer_fd)
  643. {
  644. uint64_t value;
  645. read(delay_timer_fd, &value, 8);
  646. //printf("<timerfd_triggered, %d>",delay_mp.size());
  647. //fflush(stdout);
  648. }
  649. else
  650. {
  651. int udp_fd=events[n].data.fd;
  652. if(!conn_manager.exist_fd(udp_fd)) continue;
  653. char data[buf_len];
  654. int data_len =recv(udp_fd,data,buf_len,0);
  655. if(data_len<0)
  656. {
  657. mylog(log_warn, "recv failed %d\n", data_len);
  658. continue;
  659. }
  660. assert(conn_manager.exist_fd(udp_fd));
  661. conn_manager.update_active_time(udp_fd);
  662. u64_t u64=conn_manager.find_u64_by_fd(udp_fd);
  663. sockaddr_in tmp_sockaddr;
  664. memset(&tmp_sockaddr,0,sizeof(tmp_sockaddr));
  665. tmp_sockaddr.sin_family = AF_INET;
  666. tmp_sockaddr.sin_addr.s_addr = (u64 >> 32u);
  667. tmp_sockaddr.sin_port = htons(uint16_t((u64 << 32u) >> 32u));
  668. if(is_client)
  669. {
  670. char new_data[buf_len];
  671. int new_len;
  672. if (de_obscure(data, data_len, new_data, new_len) != 0) {
  673. mylog(log_debug,"data_len=%d \n",data_len);
  674. continue;
  675. }
  676. if (remove_seq(new_data, new_len) != 0) {
  677. mylog(log_debug,"remove_seq error \n");
  678. continue;
  679. }
  680. ret = sendto(local_listen_fd, new_data,
  681. new_len , 0,
  682. (struct sockaddr *) &tmp_sockaddr,
  683. sizeof(tmp_sockaddr));
  684. }
  685. else
  686. {
  687. add_seq(data,data_len);
  688. char new_data[buf_len];
  689. int new_len=0;
  690. do_obscure(data, data_len, new_data, new_len);
  691. if(dup_num>1)
  692. {
  693. add_and_new(udp_fd, dup_num - 1, data, data_len);
  694. }
  695. ret = sendto(local_listen_fd, new_data,
  696. new_len , 0,
  697. (struct sockaddr *) &tmp_sockaddr,
  698. sizeof(tmp_sockaddr));
  699. }
  700. if (ret < 0) {
  701. mylog(log_warn, "sento returned %d\n", ret);
  702. //perror("ret<0");
  703. }
  704. mylog(log_trace, "%s :%d\n", inet_ntoa(tmp_sockaddr.sin_addr),
  705. ntohs(tmp_sockaddr.sin_port));
  706. mylog(log_trace, "%d byte sent\n", ret);
  707. }
  708. }
  709. check_delay_map();
  710. if(clear_triggered) // 删除操作在epoll event的最后进行,防止event cache中的fd失效。
  711. {
  712. u64_t value;
  713. read(clear_timer_fd, &value, 8);
  714. mylog(log_trace, "timer!\n");
  715. conn_manager.clear_inactive();
  716. }
  717. }
  718. myexit(0);
  719. return 0;
  720. }
  721. void process_arg(int argc, char *argv[])
  722. {
  723. int i, j, k;
  724. int opt;
  725. static struct option long_options[] =
  726. {
  727. {"log-level", required_argument, 0, 1},
  728. {"log-position", no_argument, 0, 1},
  729. {"disable-color", no_argument, 0, 1},
  730. {"disable-filter", no_argument, 0, 1},
  731. {NULL, 0, 0, 0}
  732. };
  733. int option_index = 0;
  734. for (i = 0; i < argc; i++)
  735. {
  736. if(strcmp(argv[i],"--log-level")==0)
  737. {
  738. if(i<argc -1)
  739. {
  740. sscanf(argv[i+1],"%d",&log_level);
  741. if(0<=log_level&&log_level<log_end)
  742. {
  743. }
  744. else
  745. {
  746. log_bare(log_fatal,"invalid log_level\n");
  747. myexit(-1);
  748. }
  749. }
  750. }
  751. if(strcmp(argv[i],"--disable-color")==0)
  752. {
  753. enable_log_color=0;
  754. }
  755. }
  756. printf("argc=%d ", argc);
  757. for (i = 0; i < argc; i++)
  758. printf("%s ", argv[i]);
  759. printf("\n");
  760. if (argc == 1)
  761. {
  762. printf(
  763. "proc -c/-s -l ip:port -r ip:port [-n dup_times] [-t dup_delay(1000=1ms)] \n");
  764. myexit( -1);
  765. }
  766. int no_l = 1, no_r = 1;
  767. while ((opt = getopt_long(argc, argv, "l:r:d:t:hcspk:",long_options,&option_index)) != -1)
  768. {
  769. //string opt_key;
  770. //opt_key+=opt;
  771. switch (opt)
  772. {
  773. case 'p':
  774. multi_process_mode=1;
  775. break;
  776. case 'k':
  777. sscanf(optarg,"%s\n",key_string);
  778. if(strlen(key_string)==0)
  779. {
  780. mylog(log_fatal,"key len=0??\n");
  781. myexit(-1);
  782. }
  783. break;
  784. case 'd':
  785. dup_num=-1;
  786. sscanf(optarg,"%d\n",&dup_num);
  787. if(dup_num<1 ||dup_num>10)
  788. {
  789. mylog(log_fatal,"dup_num must be between 1 and 10\n");
  790. myexit(-1);
  791. }
  792. break;
  793. case 't':
  794. dup_delay=-1;
  795. sscanf(optarg,"%d\n",&dup_delay);
  796. if(dup_delay<1||dup_delay>1000*1000)
  797. {
  798. mylog(log_fatal,"dup_delay must be between 1 and 10\n");
  799. myexit(-1);
  800. }
  801. break;
  802. case 'c':
  803. is_client = 1;
  804. break;
  805. case 's':
  806. is_server = 1;
  807. break;
  808. case 'l':
  809. no_l = 0;
  810. if (strchr(optarg, ':') != 0)
  811. {
  812. sscanf(optarg, "%[^:]:%d", local_address, &local_port);
  813. }
  814. else
  815. {
  816. mylog(log_fatal," -r ip:port\n");
  817. myexit(1);
  818. strcpy(local_address, "127.0.0.1");
  819. sscanf(optarg, "%d", &local_port);
  820. }
  821. break;
  822. case 'r':
  823. no_r = 0;
  824. if (strchr(optarg, ':') != 0)
  825. {
  826. //printf("in :\n");
  827. //printf("%s\n",optarg);
  828. sscanf(optarg, "%[^:]:%d", remote_address, &remote_port);
  829. //printf("%d\n",remote_port);
  830. }
  831. else
  832. {
  833. mylog(log_fatal," -r ip:port\n");
  834. myexit(1);
  835. strcpy(remote_address, "127.0.0.1");
  836. sscanf(optarg, "%d", &remote_port);
  837. }
  838. break;
  839. case 'h':
  840. break;
  841. case 1:
  842. if(strcmp(long_options[option_index].name,"log-level")==0)
  843. {
  844. }
  845. else if(strcmp(long_options[option_index].name,"disable-filter")==0)
  846. {
  847. disable_replay_filter=1;
  848. //enable_log_color=0;
  849. }
  850. else if(strcmp(long_options[option_index].name,"disable-color")==0)
  851. {
  852. //enable_log_color=0;
  853. }
  854. else if(strcmp(long_options[option_index].name,"log-position")==0)
  855. {
  856. enable_log_position=1;
  857. }
  858. break;
  859. default:
  860. mylog(log_fatal,"unknown option <%x>", opt);
  861. myexit(-1);
  862. }
  863. }
  864. if (no_l)
  865. mylog(log_fatal,"error: -i not found\n");
  866. if (no_r)
  867. mylog(log_fatal,"error: -o not found\n");
  868. if (no_l || no_r)
  869. myexit(-1);
  870. if (is_client == 0 && is_server == 0)
  871. {
  872. mylog(log_fatal,"-s -c hasnt been set\n");
  873. myexit(-1);
  874. }
  875. if (is_client == 1 && is_server == 1)
  876. {
  877. mylog(log_fatal,"-s -c cant be both set\n");
  878. myexit(-1);
  879. }
  880. }
  881. int multi_process()
  882. {
  883. struct sockaddr_in local_me, local_other;
  884. local_listen_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  885. int yes = 1;
  886. setsockopt(local_listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
  887. set_buf_size(local_listen_fd);
  888. char data[buf_len];
  889. //char *data=data0;
  890. socklen_t slen = sizeof(sockaddr_in);
  891. memset(&local_me, 0, sizeof(local_me));
  892. local_me.sin_family = AF_INET;
  893. local_me.sin_port = htons(local_port);
  894. local_me.sin_addr.s_addr = inet_addr(local_address);
  895. if (bind(local_listen_fd, (struct sockaddr*) &local_me, slen) == -1)
  896. {
  897. mylog(log_fatal,"socket bind error");
  898. myexit(1);
  899. }
  900. while (1)
  901. {
  902. int data_len;
  903. if ((data_len = recvfrom(local_listen_fd, data, buf_len, 0,
  904. (struct sockaddr *) &local_other, &slen)) == -1) //<--first packet from a new ip:port turple
  905. {
  906. mylog(log_fatal,"recv_from error");
  907. myexit(1);
  908. }
  909. mylog(log_debug,"received packet from %s:%d\n", inet_ntoa(local_other.sin_addr),
  910. ntohs(local_other.sin_port));
  911. data[data_len] = 0;
  912. printf("recv_len: %d\n", data_len);
  913. fflush(stdout);
  914. if (is_server)
  915. {
  916. char new_data[buf_len];
  917. int new_len;
  918. if(de_obscure(data,data_len,new_data,new_len)!=0)
  919. {
  920. printf("remove_padding error!\n");
  921. continue;
  922. }
  923. memcpy(data,new_data,new_len);
  924. data_len=new_len;
  925. if (remove_seq(data, data_len) != 0)
  926. {
  927. printf("remove_seq error!\n");
  928. continue;
  929. }
  930. //data=new_data;
  931. }
  932. local_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  933. //local_me.sin_addr.s_addr=inet_addr("127.0.0.1");
  934. setsockopt(local_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
  935. if (bind(local_fd, (struct sockaddr*) &local_me, slen) == -1) //偷懒的方法,有潜在问题
  936. {
  937. printf("socket bind error in chilld");
  938. myexit(1);
  939. }
  940. int ret = connect(local_fd, (struct sockaddr *) &local_other, slen); //偷懒的方法,有潜在问题
  941. if (fork() == 0) //子
  942. {
  943. if (ret != 0)
  944. {
  945. printf("connect return %d @1\n", ret);
  946. myexit(1);
  947. }
  948. close(local_listen_fd);
  949. struct sockaddr_in remote_me, remote_other;
  950. memset(&remote_other, 0, sizeof(remote_other));
  951. remote_other.sin_family = AF_INET;
  952. //printf("remote_address=%s remote_port=%d\n",remote_address,remote_port);
  953. remote_other.sin_port = htons(remote_port);
  954. remote_other.sin_addr.s_addr = inet_addr(remote_address);
  955. remote_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  956. ret = connect(remote_fd, (struct sockaddr *) &remote_other, slen);
  957. if (ret != 0)
  958. {
  959. printf("connect return %d @2\n", ret);
  960. myexit(1);
  961. }
  962. if (is_client)
  963. {
  964. add_seq(data, data_len);
  965. char new_data[buf_len];
  966. int new_len;
  967. do_obscure(data, data_len, new_data, new_len);
  968. ret = send(remote_fd, new_data, new_len, 0); //<----send the packet receved by father process ,only for this packet
  969. printf("send return %d\n", ret);
  970. if(dup_num > 1)
  971. {
  972. add_and_new(remote_fd, dup_num - 1, data, data_len);
  973. }
  974. }
  975. else
  976. {
  977. ret = send(remote_fd, data, data_len, 0);
  978. printf("send return %d\n", ret);
  979. }
  980. if (ret < 0)
  981. myexit(-1);
  982. setnonblocking(remote_fd);
  983. set_buf_size(remote_fd);
  984. setnonblocking(local_fd);
  985. set_buf_size(local_fd);
  986. int epollfd = epoll_create1(0);
  987. const int max_events = 4096;
  988. struct epoll_event ev, events[max_events];
  989. if (epollfd < 0)
  990. {
  991. printf("epoll return %d\n", epollfd);
  992. myexit(-1);
  993. }
  994. ev.events = EPOLLIN;
  995. ev.data.fd = local_fd;
  996. ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, local_fd, &ev);
  997. if (ret < 0)
  998. {
  999. printf("epoll_ctl return %d\n", ret);
  1000. myexit(-1);
  1001. }
  1002. ev.events = EPOLLIN;
  1003. ev.data.fd = remote_fd;
  1004. ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, remote_fd, &ev);
  1005. if (ret < 0)
  1006. {
  1007. printf("epoll_ctl return %d\n", ret);
  1008. myexit(-1);
  1009. }
  1010. if ((delay_timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
  1011. {
  1012. printf("timer_fd create error");
  1013. myexit(1);
  1014. }
  1015. ev.events = EPOLLIN;
  1016. ev.data.fd = delay_timer_fd;
  1017. itimerspec zero_its;
  1018. memset(&zero_its, 0, sizeof(zero_its));
  1019. timerfd_settime(delay_timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
  1020. epoll_ctl(epollfd, EPOLL_CTL_ADD, delay_timer_fd, &ev);
  1021. if (ret < 0)
  1022. {
  1023. printf("epoll_ctl return %d\n", ret);
  1024. myexit(-1);
  1025. }
  1026. check_delay_map();
  1027. for (;;)
  1028. {
  1029. int nfds = epoll_wait(epollfd, events, max_events, 180 * 1000); //3mins
  1030. if (nfds <= 0)
  1031. {
  1032. printf("epoll_wait return %d\n", nfds);
  1033. myexit(-1);
  1034. }
  1035. int n;
  1036. for (n = 0; n < nfds; ++n)
  1037. {
  1038. if (events[n].data.fd == local_fd) //data income from local end
  1039. {
  1040. data_len = recv(local_fd, data, buf_len, 0);
  1041. if (data_len < 0)
  1042. {
  1043. printf("recv return %d @1", data_len);
  1044. myexit(1);
  1045. }
  1046. data[data_len] = 0;
  1047. printf("len %d received from child@1\n", data_len);
  1048. //printf("%s received from child@1\n",buf);
  1049. //printf("before send %s\n",buf);
  1050. if(is_client)
  1051. {
  1052. add_seq(data,data_len);
  1053. char new_data[buf_len];
  1054. int new_len=0;
  1055. do_obscure(data, data_len, new_data, new_len);
  1056. ret = send(remote_fd, new_data, new_len, 0);
  1057. if(dup_num>1)
  1058. {
  1059. add_and_new(remote_fd, dup_num - 1, data, data_len);
  1060. }
  1061. }
  1062. else
  1063. {
  1064. char new_data[buf_len];
  1065. int new_len;
  1066. if(de_obscure(data,data_len,new_data,new_len)!=0) {printf("error at line %d\n",__LINE__);continue;}
  1067. if(remove_seq(new_data,new_len)!=0) {printf("error at line %d\n",__LINE__);continue;}
  1068. ret = send(remote_fd, new_data, new_len, 0);
  1069. }
  1070. if (ret < 0)
  1071. {
  1072. printf("send return %d at @1", ret);
  1073. myexit(1);
  1074. }
  1075. }
  1076. else if (events[n].data.fd == remote_fd)
  1077. {
  1078. data_len = recv(remote_fd, data, buf_len, 0);
  1079. if (data_len < 0)
  1080. {
  1081. printf("recv return -1 @2");
  1082. myexit(1);
  1083. }
  1084. data[data_len] = 0;
  1085. printf("len %d received from child@1\n", data_len);
  1086. //printf("%s received from child@2\n",buf);
  1087. if(is_client)
  1088. {
  1089. char new_data[buf_len];
  1090. int new_len;
  1091. if(de_obscure(data,data_len,new_data,new_len)!=0) {printf("error at line %d\n",__LINE__);continue;}
  1092. if(remove_seq(new_data,new_len)!=0) {printf("error at line %d\n",__LINE__);continue;}
  1093. ret = send(local_fd, new_data, new_len, 0);
  1094. }
  1095. else
  1096. {
  1097. add_seq(data,data_len);
  1098. char new_data[buf_len];
  1099. int new_len=0;
  1100. do_obscure(data, data_len, new_data, new_len);
  1101. ret = send(local_fd, new_data, new_len, 0);
  1102. if(dup_num>1)
  1103. {
  1104. add_and_new(local_fd, dup_num - 1, data, data_len);
  1105. }
  1106. }
  1107. if (ret < 0)
  1108. {
  1109. printf("send return %d @2", ret);
  1110. myexit(1);
  1111. }
  1112. }
  1113. else if (events[n].data.fd == delay_timer_fd)
  1114. {
  1115. uint64_t value;
  1116. read(delay_timer_fd, &value, 8);
  1117. //printf("<timerfd_triggered, %d>",delay_mp.size());
  1118. //fflush(stdout);
  1119. }
  1120. } //end for n = 0; n < nfds
  1121. check_delay_map();
  1122. }
  1123. myexit(0);
  1124. }
  1125. else //if(fork()==0) ... else
  1126. { //fork 's father process
  1127. close(local_fd); //father process only listen to local_listen_fd,so,close this fd
  1128. }
  1129. } //while(1)end
  1130. return 0;
  1131. }
  1132. int main(int argc, char *argv[])
  1133. {
  1134. //printf("%lld\n",get_current_time_us());
  1135. //printf("%lld\n",get_current_time_us());
  1136. //printf("%lld\n",get_current_time_us());
  1137. //printf("%lld\n",get_current_time());
  1138. dup2(1, 2); //redirect stderr to stdout
  1139. int i, j, k;
  1140. process_arg(argc,argv);
  1141. init_random_number_fd();
  1142. anti_replay.prepare();
  1143. signal(SIGCHLD, handler);
  1144. mylog(log_info,"test\n");
  1145. remote_address_uint32=inet_addr(remote_address);
  1146. if(!multi_process_mode)
  1147. {
  1148. event_loop();
  1149. }
  1150. else
  1151. {
  1152. multi_process();
  1153. }
  1154. return 0;
  1155. }