Browse Source

added conv manager

wangyu 8 years ago
parent
commit
dd24b03e33
1 changed files with 174 additions and 65 deletions
  1. 174 65
      main.cpp

+ 174 - 65
main.cpp

@@ -48,14 +48,19 @@ char local_address[100], remote_address[100],source_address[100];
 int local_port = -1, remote_port = -1;
 int epollfd ;
 
+uint32_t const_id=0;
+uint32_t oppsite_const_id;
+
 uint32_t my_id=0;
 uint32_t oppsite_id=0;
+
 uint32_t conv_id=0;
+uint32_t oppsite_const_id=0;
 
 const int handshake_timeout=2000;
 
 const int heartbeat_timeout=10000;
-const int udp_timeout=2000;
+const int udp_timeout=3000;
 
 const int heartbeat_interval=1000;
 
@@ -78,7 +83,7 @@ int bind_fd;
 
 int first_data_packet=0;
 
-const int seq_mode=2;  //0  dont  increase /1 increase   //increase randomly,about every 10 packet
+const int seq_mode=2;  //0  dont  increase /1 increase   //increase randomly,about every 5 packet
 
 const uint64_t epoll_timer_fd_sn=1;
 const uint64_t epoll_raw_recv_fd_sn=2;
@@ -150,6 +155,16 @@ uint8_t key[]={1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,   0,0,0,0};
 
 
 const int window_size=2000;
+
+
+uint32_t get_true_random_number()
+{
+	uint32_t ret;
+	int fd=open("/dev/urandom",O_RDONLY);
+	read(fd,&ret,sizeof(ret));
+	return htonl(ret);
+}
+
 struct anti_replay_t
 {
 	uint64_t max_packet_received;
@@ -159,7 +174,7 @@ struct anti_replay_t
 	{
 		disabled=0;
 		max_packet_received=0;
-		//memset(window,0,sizeof(window));
+		//memset(window,0,sizeof(window)); //not necessary
 	}
 	void re_init()
 	{
@@ -371,6 +386,101 @@ long long get_current_time()
 	clock_gettime(CLOCK_MONOTONIC, &tmp_time);
 	return tmp_time.tv_sec*1000+tmp_time.tv_nsec/(1000*1000ll);
 }
+const int conv_timeout=60000; //60 second
+const int conv_clear_ratio=10;
+
+struct conv_manager
+{
+	map<uint64_t,uint32_t> u64_to_conv;  //conv and u64 are both supposed to be uniq
+	map<uint32_t,uint64_t> conv_to_u64;
+
+	map<uint32_t,uint64_t> conv_last_active_time;
+
+	map<uint32_t,uint64_t>::iterator clear_it;
+
+	conv_manager()
+	{
+		clear_it=conv_last_active_time.begin();
+	}
+	uint32_t get_new_conv()
+	{
+		uint32_t conv=get_true_random_number();
+		while(conv_to_u64.find(conv)!=conv_to_u64.end())
+		{
+			conv=get_true_random_number();
+		}
+		return conv;
+	}
+	int is_conv_used(uint32_t conv)
+	{
+		return conv_to_u64.find(conv)!=conv_to_u64.end();
+	}
+	int is_u64_used(uint64_t u64)
+	{
+		return u64_to_conv.find(u64)!=u64_to_conv.end();
+	}
+	int find_conv_by_u64(uint64_t u64)
+	{
+		return u64_to_conv[u64];
+	}
+	int find_u64_by_conv(uint32_t conv)
+	{
+		return conv_to_u64[conv];
+	}
+	int update_active_time(uint32_t conv)
+	{
+		return conv_last_active_time[conv]=get_current_time();
+	}
+	int insert_conv(uint32_t conv,uint64_t u64)
+	{
+		u64_to_conv[u64]=conv;
+		conv_to_u64[conv]=u64;
+		conv_last_active_time[conv]=get_current_time();
+		return 0;
+	}
+	int erase_conv(uint32_t conv)
+	{
+		uint64_t u64=conv_to_u64[conv];
+		conv_to_u64.erase(conv);
+		u64_to_conv.erase(u64);
+		conv_last_active_time.erase(conv);
+		return 0;
+	}
+	int clean_inactive()
+	{
+		map<uint32_t,uint64_t>::iterator old_it;
+		map<uint32_t,uint64_t>::iterator it;
+		int cnt=0;
+		it=clear_it;
+		int size=conv_last_active_time.size();
+		int num_to_clean=size/conv_clear_ratio;   //clear 1/10 each time,to avoid latency glitch
+
+		uint64_t current_time=get_current_time();
+		for(;;)
+		{
+			if(cnt>=num_to_clean) break;
+			if(conv_last_active_time.begin()==conv_last_active_time.end()) break;
+
+			if(it==conv_last_active_time.end())
+			{
+				it=conv_last_active_time.begin();
+			}
+
+			if( current_time -it->second  >conv_timeout )
+			{
+				old_it=it;
+				it++;
+				erase_conv(old_it->first);
+			}
+			else
+			{
+				it++;
+			}
+			cnt++;
+		}
+		return 0;
+	}
+};
 void init_filter(int port)
 {
 	code[8].k=code[10].k=port;
@@ -690,7 +800,7 @@ int send_raw(packet_info_t &info,char * payload,int payloadlen)
     	 }
     	 else if(seq_mode==2)
     	 {
-    		 if(random()% 20==5 )
+    		 if(random()% 5==3 )
     			 g_packet_info.seq+=payloadlen;
     	 }
      }
@@ -730,9 +840,12 @@ int send_data(packet_info_t &info,char* data,int len,uint32_t id1,uint32_t id2,u
 	return 0;
 }
 
-int send_hb(packet_info_t &info,uint32_t id1,uint32_t id2 )
+
+const int hb_length=1+3*sizeof(uint32_t);
+
+int send_hb(packet_info_t &info,uint32_t id1,uint32_t id2 ,uint32_t id3)
 {
-	int new_len=1+sizeof(my_id)*2;
+	int new_len=1+sizeof(my_id)*3;
 	send_data_buf[0]='h';
 
 	uint32_t tmp;
@@ -742,6 +855,9 @@ int send_hb(packet_info_t &info,uint32_t id1,uint32_t id2 )
 	tmp=htonl(id2);
 	memcpy(send_data_buf+1+sizeof(my_id),&tmp,sizeof(my_id));
 
+	tmp=htonl(id3);
+	memcpy(send_data_buf+1+sizeof(my_id)*2,&tmp,sizeof(my_id));
+
 	if(pre_send(send_data_buf,new_len)<0)
 	{
 		return -1;
@@ -763,13 +879,6 @@ int send_hb(packet_info_t &info,uint32_t id1,uint32_t id2 )
 	return 0;
 }*/
 
-uint32_t get_true_random_number()
-{
-	uint32_t ret;
-	int fd=open("/dev/urandom",O_RDONLY);
-	read(fd,&ret,sizeof(ret));
-	return htonl(ret);
-}
 
 int try_to_list_and_bind(int port)
 {
@@ -802,7 +911,7 @@ int try_to_list_and_bind(int port)
 int client_bind_to_a_new_port()
 {
 	int raw_send_port=10000+get_true_random_number()%(65535-10000);
-	for(int i=0;i<1000;i++)
+	for(int i=0;i<1000;i++)//try 1000 times at max,this should be enough
 	{
 		if (try_to_list_and_bind(raw_send_port)==0)
 		{
@@ -850,7 +959,8 @@ int fake_tcp_keep_connection_client() //for client
 		{
 			client_current_state=client_nothing;
 			printf("state back to nothing\n");
-			goto begin;
+			return 0;
+			//goto begin;
 		}
 		else
 		{
@@ -866,7 +976,8 @@ int fake_tcp_keep_connection_client() //for client
 		{
 			client_current_state=client_nothing;
 			printf("state back to nothing\n");
-			goto begin;
+			return 0;
+			//goto begin;
 		}
 		else
 		{
@@ -883,12 +994,13 @@ int fake_tcp_keep_connection_client() //for client
 		{
 			client_current_state=client_nothing;
 			printf("state back to nothing\n");
-			goto begin;
+			return 0;
+			//goto begin;
 		}
 		else
 		{
 			retry_counter--;
-			send_hb(g_packet_info,my_id,oppsite_id);
+			send_hb(g_packet_info,my_id,oppsite_id,const_id);
 			last_state_time=get_current_time();
 			printf("retry send heart_beat  counter left:%d\n",retry_counter);
 			printf("heartbeat sent <%x,%x>\n",oppsite_id,my_id);
@@ -916,7 +1028,7 @@ int fake_tcp_keep_connection_client() //for client
 
 		if(debug_mode)printf("heartbeat sent <%x,%x>\n",oppsite_id,my_id);
 
-		send_hb(g_packet_info,my_id,oppsite_id);
+		send_hb(g_packet_info,my_id,oppsite_id,const_id);
 		last_hb_sent_time=get_current_time();
 	}
 
@@ -955,7 +1067,7 @@ int fake_tcp_keep_connection_server()
 		else
 		{
 			retry_counter--;
-			send_hb(g_packet_info,my_id,0);
+			send_hb(g_packet_info,my_id,0,const_id);
 			last_state_time=get_current_time();
 			printf("half heart beat sent<%x>\n",my_id);
 		}
@@ -981,7 +1093,7 @@ int fake_tcp_keep_connection_server()
 			return 0;
 		}
 
-		send_hb(g_packet_info,my_id,oppsite_id);
+		send_hb(g_packet_info,my_id,oppsite_id,const_id);
 		last_hb_sent_time=get_current_time();
 
 		if(debug_mode) printf("heart beat sent<%x>\n",my_id);
@@ -1017,7 +1129,7 @@ int set_timer(int epollfd,int &timer_fd)
 	}
 }
 
-int client_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len)
+int client_raw_recv(iphdr *iph,tcphdr *tcph,char * data,int data_len)
 {
 
 	if(client_current_state==client_syn_sent )
@@ -1052,7 +1164,7 @@ int client_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len)
 			printf("unexpected syn ack or other zero lenght packet\n");
 			return 0;
 		}
-		if(data_len!=sizeof(my_id)*2+1||data[0]!='h')
+		if(data_len<hb_length||data[0]!='h')
 		{
 			printf("not a heartbeat\n");
 			return 0;
@@ -1067,7 +1179,7 @@ int client_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len)
 
 		printf("====first hb received %x\n==",oppsite_id);
 		printf("changed state to client_heartbeat_sent\n");
-		send_hb(g_packet_info,my_id,oppsite_id);
+		send_hb(g_packet_info,my_id,oppsite_id,const_id);
 
 		client_current_state=client_heartbeat_sent;
 		last_state_time=get_current_time();
@@ -1080,7 +1192,7 @@ int client_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len)
 			printf("unexpected syn ack or other zero lenght packet\n");
 			return 0;
 		}
-		if(data_len!=sizeof(my_id)*2+1||data[0]!='h')
+		if(data_len<hb_length||data[0]!='h')
 		{
 			printf("not a heartbeat\n");
 			return 0;
@@ -1124,7 +1236,7 @@ int client_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len)
 			return 0;
 		}
 
-		if(data_len==sizeof(my_id)*2+1&&data[0]=='h')
+		if(data_len<hb_length&&data[0]=='h')
 		{
 			if(debug_mode)printf("heart beat received\n");
 			last_hb_recv_time=get_current_time();
@@ -1203,7 +1315,7 @@ int server_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len)
 		g_packet_info.ack=1;
 		g_packet_info.seq+=1;////////is this right?
 
-		send_hb(g_packet_info,my_id,0);   // send a hb immidately
+		send_hb(g_packet_info,my_id,0,const_id);   // send a hb immidately
 
 		printf("changed state to server_heartbeat_sent\n");
 
@@ -1221,7 +1333,7 @@ int server_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len)
 			printf("unexpected adress\n");
 			return 0;
 		}
-		if(data_len!=sizeof(my_id)*2+1||data[0]!='h')
+		if(data_len<hb_length||data[0]!='h')
 		{
 			return 0;
 		}
@@ -1239,7 +1351,7 @@ int server_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len)
 		int tmp_oppsite_session_id=  ntohl(* ((uint32_t *)&data[1]));
 		oppsite_id=tmp_oppsite_session_id;
 
-		send_hb(g_packet_info,my_id,oppsite_id);
+		send_hb(g_packet_info,my_id,oppsite_id,const_id);
 
 		server_current_state=server_ready;
 		last_state_time=get_current_time();
@@ -1259,7 +1371,7 @@ int server_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len)
 			return 0;
 		}
 
-		if(data[0]=='h'&&data_len==sizeof(my_id)*2+1)
+		if(data[0]=='h'&&data_len>=hb_length)
 		{
 			uint32_t tmp= ntohl(* ((uint32_t *)&data[1+sizeof(uint32_t)]));
 			if(debug_mode)printf("received hb <%x,%x>\n",oppsite_id,tmp);
@@ -1368,7 +1480,7 @@ int server_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len)
 
 
 
-int on_raw_recv()
+int on_raw_recv(iphdr * & iph,tcphdr * &tcph,char * &data,int &data_len)
 {
 	int size;
 	struct sockaddr saddr;
@@ -1388,7 +1500,7 @@ int on_raw_recv()
 
 	char *ip_begin=buf+14;
 
-	struct iphdr *iph = (struct iphdr *) (ip_begin);
+	iph = (struct iphdr *) (ip_begin);
 
 
     if (!(iph->ihl > 0 && iph->ihl <=60)) {
@@ -1405,7 +1517,7 @@ int on_raw_recv()
 	int ip_len=ntohs(iph->tot_len);
 
     unsigned short iphdrlen =iph->ihl*4;
-    struct tcphdr *tcph=(struct tcphdr*)(ip_begin+ iphdrlen);
+    tcph=(struct tcphdr*)(ip_begin+ iphdrlen);
     unsigned short tcphdrlen = tcph->doff*4;
 
     if (!(tcph->doff > 0 && tcph->doff <=60)) {
@@ -1479,9 +1591,9 @@ int on_raw_recv()
 
    // char pseudo_tcp_buffer[MTU];
 
-    int data_len = ip_len-tcphdrlen-iphdrlen;
+    data_len = ip_len-tcphdrlen-iphdrlen;
 
-    char *data=ip_begin+tcphdrlen+iphdrlen;
+    data=ip_begin+tcphdrlen+iphdrlen;
 
     if(data_len>0&&data[0]=='h')
     {
@@ -1515,21 +1627,7 @@ int on_raw_recv()
 		//fflush(stdout);
     }
 
-    int new_len=data_len;
-    memcpy(raw_recv_buf3,data,new_len);
-    if(data_len!=0)
-    {
-    	if(pre_recv(raw_recv_buf3,new_len)<0)
-    		return -1;
-    }
-	if(prog_mode==server_mode)
-	{
-		server_raw_recv(iph,tcph,raw_recv_buf3,new_len);
-	}
-	else
-	{
-		client_raw_recv(iph,tcph,raw_recv_buf3,new_len);
-	}
+
 	return 0;
 }
 int client()
@@ -1610,19 +1708,17 @@ int client()
 		for (n = 0; n < nfds; ++n) {
 			if (events[n].data.u64 == epoll_raw_recv_fd_sn)
 			{
-				on_raw_recv();
-				/*if(is_sync_ack)
-				{
-
-				}
-				else if(is heart_beat)
-				{
-
-				}
-				else if(is_data)
-				{
-					sendto();
-				}*/
+				iphdr *iph;tcphdr *tcph;char* data;int data_len;
+				on_raw_recv(iph,tcph,data,data_len);
+
+			    int new_len=data_len;
+			    memcpy(raw_recv_buf3,data,new_len);
+			    if(data_len!=0)
+			    {
+			    	if(pre_recv(raw_recv_buf3,new_len)<0)
+			    		continue;
+			    }
+				client_raw_recv(iph,tcph,raw_recv_buf3,new_len);
 			}
 			if(events[n].data.u64 ==epoll_timer_fd_sn)
 			{
@@ -1744,10 +1840,11 @@ int server()
 			{
 				int recv_len=recv(udp_fd,buf,buf_len,0);
 				printf("received a packet from udp_fd,len:%d\n",recv_len);
-				perror("wtf?");
+
 				if(recv_len<0)
 				{
 					printf("continue\n");
+					perror("wtf?");
 					continue;
 					//return 0;
 				}
@@ -1762,7 +1859,18 @@ int server()
 			}
 			if (events[n].data.u64 == epoll_raw_recv_fd_sn)
 			{
-				on_raw_recv();
+				iphdr *iph;tcphdr *tcph;char* data;int data_len;
+				on_raw_recv(iph,tcph,data,data_len);
+
+			    int new_len=data_len;
+			    memcpy(raw_recv_buf3,data,new_len);
+			    if(data_len!=0)
+			    {
+			    	if(pre_recv(raw_recv_buf3,new_len)<0)
+			    		continue;
+			    }
+
+				server_raw_recv(iph,tcph,raw_recv_buf3,new_len);
 			}
 
 		}
@@ -1771,6 +1879,7 @@ int server()
 }
 int main(int argc, char *argv[])
 {
+	const_id=get_true_random_number();
 
 	g_packet_info.ack_seq=get_true_random_number();
 	g_packet_info.seq=get_true_random_number();