Browse Source

implemented all direction

wangyu- 8 years ago
parent
commit
507b960ba3
9 changed files with 128 additions and 107 deletions
  1. 9 0
      common.cpp
  2. 31 0
      common.h
  3. 6 14
      connection.cpp
  4. 3 2
      connection.h
  5. 28 10
      fd_manager.cpp
  6. 16 11
      fd_manager.h
  7. 31 38
      main.cpp
  8. 4 2
      packet.cpp
  9. 0 30
      packet.h

+ 9 - 0
common.cpp

@@ -416,3 +416,12 @@ int create_new_udp(int &new_udp_fd,int remote_address_uint32,int remote_port)
 	}
 	return 0;
 }*/
+void ip_port_t::from_u64(u64_t u64)
+{
+	ip=get_u64_h(u64);
+	port=get_u64_l(u64);
+}
+u64_t ip_port_t::to_u64()
+{
+	return pack_u64(ip,port);
+}

+ 31 - 0
common.h

@@ -124,6 +124,37 @@ typedef u64_t padding_t;
 
 typedef u64_t anti_replay_seq_t;
 
+typedef u64_t fd64_t;
+
+enum dest_type{none=0,type_ip_port,type_fd64,type_fd};
+
+
+struct ip_port_t
+{
+	u32_t ip;
+	int port;
+	void from_u64(u64_t u64);
+	u64_t to_u64();
+};
+
+union inner_t
+{
+	ip_port_t ip_port;
+	int fd;
+	fd64_t fd64;
+};
+struct dest_t
+{
+	dest_type type;
+	inner_t inner;
+};
+
+struct fd_info_t
+{
+	ip_port_t ip_port;
+};
+
+
 u64_t get_current_time();
 u64_t get_current_time_us();
 u64_t pack_u64(u32_t a,u32_t b);

+ 6 - 14
connection.cpp

@@ -166,7 +166,7 @@ conv_manager_t::~conv_manager_t()
  {
 	 ready_num=0;
 	 mp.reserve(100007);
-	 fd64_mp.reserve(100007);
+	 //fd64_mp.reserve(100007);
 	 clear_it=mp.begin();
 	 last_clear_time=0;
  }
@@ -209,6 +209,7 @@ conv_manager_t::~conv_manager_t()
 	 }
 	 return *mp[u64];
  }
+ /*
  int conn_manager_t::exist_fd64(fd64_t fd64)
  {
 	 return fd64_mp.find(fd64)!=fd64_mp.end();
@@ -225,7 +226,7 @@ conv_manager_t::~conv_manager_t()
 	 ip_port_t res;
 	 res.from_u64(fd64_mp[fd64]);
 	 return res;
- }
+ }*/
  int conn_manager_t::erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it)
  {
 	 /*
@@ -310,17 +311,8 @@ void server_clear_function(u64_t u64)//used in conv_manager in server mode.for s
 {
 	int fd64=u64;
 	int ret;
-	assert(fd_manager.fd64_exist(fd64));
-	int fd=fd_manager.fd64_to_fd(fd64);
+	assert(fd_manager.exist(fd64));
+	int fd=fd_manager.to_fd(fd64);
 
-	fd_manager.remove_fd64(fd64);
-	ret= close(fd);  //closed fd should be auto removed from epoll
-	if (ret!=0)
-	{
-		mylog(log_fatal,"close fd %d failed !!!!\n",fd);
-		myexit(-1);  //this shouldnt happen
-	}
-	//mylog(log_fatal,"size:%d !!!!\n",conn_manager.udp_fd_mp.size());
-	assert(conn_manager.fd64_mp.find(fd)!=conn_manager.fd64_mp.end());
-	conn_manager.fd64_mp.erase(fd);
+	fd_manager.close(fd64);
 }

+ 3 - 2
connection.h

@@ -79,7 +79,7 @@ struct conn_manager_t  //manager for connections. for client,we dont need conn_m
 
  u32_t ready_num;
 
- unordered_map<fd64_t,u64_t> fd64_mp;
+// unordered_map<fd64_t,u64_t> fd64_mp;
  unordered_map<u64_t,conn_info_t*> mp;//<ip,port> to conn_info_t;
  	 	 	 	 	 	 	 	 	  //put it at end so that it de-consturcts first
 
@@ -91,9 +91,10 @@ struct conn_manager_t  //manager for connections. for client,we dont need conn_m
  int exist_ip_port(ip_port_t);
  conn_info_t *& find_insert_p(ip_port_t);  //be aware,the adress may change after rehash
  conn_info_t & find_insert(ip_port_t) ; //be aware,the adress may change after rehash
+ /*
  int exist_fd64(fd64_t fd64);
  void insert_fd64(fd64_t fd64,ip_port_t);
- ip_port_t find_by_fd64(fd64_t fd64);
+ ip_port_t find_by_fd64(fd64_t fd64);*/
 
 
  int erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it);

+ 28 - 10
fd_manager.cpp

@@ -11,20 +11,22 @@ int fd_manager_t::fd_exist(int fd)
 {
 	return fd_to_fd64_mp.find(fd)!=fd_to_fd64_mp.end();
 }
-int fd_manager_t::fd64_exist(fd64_t fd64)
+int fd_manager_t::exist(fd64_t fd64)
 {
 	return fd64_to_fd_mp.find(fd64)!=fd64_to_fd_mp.end();
 }
+/*
 fd64_t fd_manager_t::fd_to_fd64(int fd)
 {
 	assert(fd_exist(fd));
 	return fd_to_fd64_mp[fd];
-}
-int fd_manager_t::fd64_to_fd(fd64_t fd64)
+}*/
+int fd_manager_t::to_fd(fd64_t fd64)
 {
-	assert(fd64_exist(fd64));
+	assert(exist(fd64));
 	return fd64_to_fd_mp[fd64];
 }
+/*
 void fd_manager_t::remove_fd(int fd)
 {
 	assert(fd_exist(fd));
@@ -32,27 +34,34 @@ void fd_manager_t::remove_fd(int fd)
 	fd_to_fd64_mp.erase(fd);
 	fd64_to_fd_mp.erase(fd64);
 	//return 0;
-}
-void fd_manager_t::remove_fd64(fd64_t fd64)
+}*/
+void fd_manager_t::close(fd64_t fd64)
 {
-	assert(fd64_exist(fd64));
+	assert(exist(fd64));
 	int fd=fd64_to_fd_mp[fd64];
 	fd64_to_fd_mp.erase(fd64);
 	fd_to_fd64_mp.erase(fd);
+	if(exist_info(fd64))
+	{
+		fd_info_mp.erase(fd64);
+	}
+	close(fd);
 	//return 0;
 }
 void fd_manager_t::reserve()
 {
-	fd_to_fd64_mp.reserve(100007);
-	fd64_to_fd_mp.reserve(100007);
+	fd_to_fd64_mp.reserve(10007);
+	fd64_to_fd_mp.reserve(10007);
+	fd_info_mp.reserve(10007);
 	//return 0;
 }
-u64_t fd_manager_t::insert_fd(int fd)
+u64_t fd_manager_t::create(int fd)
 {
 	assert(!fd_exist(fd));
 	fd64_t fd64=counter++;
 	fd_to_fd64_mp[fd]=fd64;
 	fd64_to_fd_mp[fd64]=fd;
+	//fd_info_mp[fd64];
 	return fd64;
 }
 fd_manager_t::fd_manager_t()
@@ -60,3 +69,12 @@ fd_manager_t::fd_manager_t()
 	counter=u32_t(-1);
 	counter+=2;
 }
+fd_info_t & fd_manager_t::get_info(fd64_t fd64)
+{
+	assert(exist(fd64));
+	return fd_info_mp[fd64];
+}
+int fd_manager_t::exist_info(fd64_t fd64)
+{
+	return fd_info_mp.find(fd64)!=fd_info_mp.end();
+}

+ 16 - 11
fd_manager.h

@@ -9,24 +9,29 @@
 #define FD_MANAGER_H_
 
 #include "common.h"
+#include "packet.h"
+
 
-typedef u64_t fd64_t;
 
 struct fd_manager_t   //conver fd to a uniq 64bit number,avoid fd value conflict caused by close and re-create
 //not used currently
 {
-	u64_t counter;
-	unordered_map<int,u64_t> fd_to_fd64_mp;
-	unordered_map<u64_t,int> fd64_to_fd_mp;
-	int fd_exist(int fd);
-	int fd64_exist(fd64_t fd64);
-	fd64_t fd_to_fd64(int fd);
-	int fd64_to_fd(fd64_t);
-	void remove_fd(int fd);
-	void remove_fd64(fd64_t fd64);
+	fd_info_t & get_info(fd64_t fd64);
+	int exist_info(fd64_t);
+	int exist(fd64_t fd64);
+	int to_fd(fd64_t);
+	void close(fd64_t fd64);
 	void reserve();
-	u64_t insert_fd(int fd);
+	u64_t create(int fd);
 	fd_manager_t();
+private:
+	u64_t counter;
+	unordered_map<int,fd64_t> fd_to_fd64_mp;
+	unordered_map<fd64_t,int> fd64_to_fd_mp;
+	unordered_map<fd64_t,fd_info_t> fd_info_mp;
+	int fd_exist(int fd);
+	//void remove_fd(int fd);
+	//fd64_t fd_to_fd64(int fd);
 };
 
 extern fd_manager_t fd_manager;

+ 31 - 38
main.cpp

@@ -186,16 +186,10 @@ int client_event_loop()
 				get_conv(conv,data,data_len,new_data,new_len);
 				if(!conn_info.conv_manager.is_conv_used(conv))continue;
 				u64_t u64=conn_info.conv_manager.find_conv_by_u64(conv);
-				u32_t ip=get_u64_h(u64);
-				int port=get_u64_l(u64);
 				dest_t dest;
+				dest.inner.ip_port.from_u64(u64);
 				dest.type=type_ip_port;
-				dest.inner.ip_port.ip=ip;
-				dest.inner.ip_port.port=port;
 				my_send(dest,new_data,new_len);
-				//sendto_ip_port(ip,port,new_data,new_len,0);
-
-				//////////////////todo
 			}
 			/*
 			else if(events[idx].data.u64 ==(u64_t)timer_fd)
@@ -226,7 +220,10 @@ int client_event_loop()
 				mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
 						ntohs(udp_new_addr_in.sin_port),data_len);
 
-				u64_t u64=((u64_t(udp_new_addr_in.sin_addr.s_addr))<<32u)+ntohs(udp_new_addr_in.sin_port);
+				ip_port_t ip_port;
+				ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
+				ip_port.port=udp_new_addr_in.sin_port;
+				u64_t u64=ip_port.to_u64();
 				u32_t conv;
 
 				if(!conn_info.conv_manager.is_u64_used(u64))
@@ -244,7 +241,6 @@ int client_event_loop()
 				{
 					conv=conn_info.conv_manager.find_conv_by_u64(u64);
 				}
-
 				conn_info.conv_manager.update_active_time(conv);
 
 				char *new_data;
@@ -371,23 +367,23 @@ int server_event_loop()
 				if (!conn_info.conv_manager.is_conv_used(conv))
 				{
 					int new_udp_fd;
-					new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port);
+					ret=new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port);
 
 					if (ret != 0) {
-						mylog(log_warn, "[%s:%d]add udp_fd error\n",my_ntoa(ip_port.ip),ip_port.port);
-						close(new_udp_fd);
-						return -1;
+						mylog(log_warn, "[%s:%d]new_connected_socket failed\n",my_ntoa(ip_port.ip),ip_port.port);
+						continue;
 					}
 
-					fd64_t fd64 = fd_manager.insert_fd(new_udp_fd);
+					fd64_t fd64 = fd_manager.create(new_udp_fd);
 					ev.events = EPOLLIN;
 					ev.data.u64 = fd64;
 					ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_udp_fd, &ev);
 
 					conn_info.conv_manager.insert_conv(conv, fd64);
-					assert(!conn_manager.exist_fd64(fd64));
+					fd_manager.get_info(fd64).ip_port=ip_port;
+					//assert(!conn_manager.exist_fd64(fd64));
 
-					conn_manager.insert_fd64(fd64,ip_port);
+					//conn_manager.insert_fd64(fd64,ip_port);
 				}
 				fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv);
 				//int fd=fd_manager.fd64_to_fd(fd64);
@@ -438,37 +434,27 @@ int server_event_loop()
 				char data[buf_len];
 				int data_len;
 				fd64_t fd64=events[idx].data.u64;
-				if(!fd_manager.fd64_exist(fd64))
+				if(!fd_manager.exist(fd64))   //fd64 has been closed
 				{
 					continue;
 				}
-				int fd=fd_manager.fd64_to_fd(fd64);
-				if(!conn_manager.exist_fd64(fd64)) //this can happen,when fd is a just closed fd
-				{
-					mylog(log_debug,"fd no longer exists in udp_fd_mp,udp fd64 %lld\n",fd64);
-					recv(fd,0,0,0);
-					continue;
-				}
-				ip_port_t ip_port=conn_manager.find_by_fd64(fd64);
-				conn_info_t* p_conn_info=conn_manager.find_insert_p(ip_port);
 
-				if(!conn_manager.exist_ip_port(ip_port))//TODO remove this for peformance
-				{
-					mylog(log_fatal,"ip port no longer exits 2!!!this shouldnt happen\n");
-					myexit(-1);
-				}
+				//assert(conn_manager.exist_fd64(fd64));
+
+				assert(fd_manager.exist_info(fd64));
+				ip_port_t ip_port=fd_manager.get_info(fd64).ip_port;
+
+				assert(conn_manager.exist_ip_port(ip_port));
+
+				conn_info_t* p_conn_info=conn_manager.find_insert_p(ip_port);
 
 				conn_info_t &conn_info=*p_conn_info;
 
-				if(!conn_info.conv_manager.is_u64_used(fd))
-				{
-					mylog(log_debug,"conv no longer exists,udp fd %d\n",fd);
-					int recv_len=recv(fd,0,0,0); ///////////TODO ,delete this
-					continue;
-				}
+				assert(conn_info.conv_manager.is_u64_used(fd64));
 
-				u32_t conv_id=conn_info.conv_manager.find_conv_by_u64(fd);
+				u32_t conv=conn_info.conv_manager.find_conv_by_u64(fd64);
 
+				int fd=fd_manager.to_fd(fd64);
 				data_len=recv(fd,data,max_data_len,0);
 
 				mylog(log_trace,"received a packet from udp_fd,len:%d\n",data_len);
@@ -485,6 +471,13 @@ int server_event_loop()
 					mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
 				}
 
+				char *new_data;
+				int new_len;
+				put_conv(conv,data,data_len,new_data,new_len);
+				dest_t dest;
+				dest.type=type_ip_port;
+				dest.inner.ip_port=ip_port;
+				my_send(dest,new_data,new_len);
 				////////todo send data
 			}
 			else

+ 4 - 2
packet.cpp

@@ -26,6 +26,8 @@ char key_string[1000]= "secret key";
 
 int local_listen_fd=-1;
 
+
+
 struct anti_replay_t
 {
 	u64_t max_packet_received;
@@ -244,8 +246,8 @@ int my_send(dest_t &dest,char *data,int len)
 		}
 		case type_fd64:
 		{
-			if(!fd_manager.fd64_exist(dest.inner.fd64)) return -1;
-			int fd=fd_manager.fd64_to_fd(dest.inner.fd64);
+			if(!fd_manager.exist(dest.inner.fd64)) return -1;
+			int fd=fd_manager.to_fd(dest.inner.fd64);
 			return send_fd(fd,data,len,0);
 			break;
 		}

+ 0 - 30
packet.h

@@ -24,36 +24,6 @@ extern int random_drop;
 extern int local_listen_fd;
 
 
-enum dest_type{none=0,type_ip_port,type_fd64,type_fd};
-
-
-struct ip_port_t
-{
-	u32_t ip;
-	int port;
-	void from_u64(u64_t u64)
-	{
-		ip=get_u64_h(u64);
-		port=get_u64_l(u64);
-	}
-	u64_t to_u64()
-	{
-		return pack_u64(ip,port);
-	}
-
-};
-union inner_t
-{
-	ip_port_t ip_port;
-	int fd;
-	fd64_t fd64;
-};
-struct dest_t
-{
-	dest_type type;
-	inner_t inner;
-};
-
 int my_send(dest_t &dest,char *data,int len);
 
 void encrypt_0(char * input,int &len,char *key);