wangyu- 8 年 前
コミット
a71576c180
12 ファイル変更140 行追加56 行削除
  1. 1 1
      common.cpp
  2. 21 14
      connection.cpp
  3. 5 11
      connection.h
  4. 10 5
      delay_manager.cpp
  5. 2 2
      delay_manager.h
  6. 5 6
      fd_manager.cpp
  7. 1 1
      fd_manager.h
  8. 28 0
      lib/rs.c
  9. 4 0
      lib/rs.h
  10. 61 14
      main.cpp
  11. 1 1
      packet.cpp
  12. 1 1
      packet.h

+ 1 - 1
common.cpp

@@ -22,7 +22,7 @@ char iptables_rule[200]="";
 
 program_mode_t program_mode=unset_mode;//0 unset; 1client 2server
 
-u64_t get_current_time()
+u64_t get_current_time()//ms
 {
 	timespec tmp_time;
 	clock_gettime(CLOCK_MONOTONIC, &tmp_time);

+ 21 - 14
connection.cpp

@@ -92,9 +92,13 @@ conv_manager_t::~conv_manager_t()
 	}
 	int conv_manager_t::insert_conv(u32_t conv,u64_t u64)//////todo add capacity
 	{
+		int bucket_size_before=conv_last_active_time.bucket_count();
 		u64_to_conv[u64]=conv;
 		conv_to_u64[conv]=u64;
 		conv_last_active_time[conv]=get_current_time();
+		int bucket_size_after=conv_last_active_time.bucket_count();
+		if(bucket_size_after!=bucket_size_before)
+			clear_it=conv_last_active_time.begin();
 		return 0;
 	}
 	int conv_manager_t::erase_conv(u32_t conv)
@@ -168,13 +172,13 @@ conv_manager_t::~conv_manager_t()
 	}
  conn_manager_t::conn_manager_t()
  {
-	 ready_num=0;
+	 //ready_num=0;
 	 mp.reserve(100007);
 	 //fd64_mp.reserve(100007);
 	 clear_it=mp.begin();
 	 last_clear_time=0;
  }
- int conn_manager_t::exist_ip_port(ip_port_t ip_port)
+ int conn_manager_t::exist(ip_port_t ip_port)
  {
 	 u64_t u64=ip_port.to_u64();
 	 if(mp.find(u64)!=mp.end())
@@ -193,27 +197,30 @@ conv_manager_t::~conv_manager_t()
 	 mp[u64];
 	 return 0;
  }*/
- conn_info_t *& conn_manager_t::find_insert_p(ip_port_t ip_port) //todo capacity
+
+ conn_info_t *& conn_manager_t::find_p(ip_port_t ip_port) //todo capacity
  //be aware,the adress may change after rehash
  {
+	 assert(exist(ip_port));
 	 u64_t u64=ip_port.to_u64();
-	 unordered_map<u64_t,conn_info_t*>::iterator it=mp.find(u64);
-	 if(it==mp.end())
-	 {
-		 mp[u64]=new conn_info_t;
-	 }
 	 return mp[u64];
  }
- conn_info_t & conn_manager_t::find_insert(ip_port_t ip_port)  //be aware,the adress may change after rehash
+ conn_info_t & conn_manager_t::find(ip_port_t ip_port)  //be aware,the adress may change after rehash
  {
+	 assert(exist(ip_port));
 	 u64_t u64=ip_port.to_u64();
-	 unordered_map<u64_t,conn_info_t*>::iterator it=mp.find(u64);
-	 if(it==mp.end())
-	 {
-		 mp[u64]=new conn_info_t;
-	 }
 	 return *mp[u64];
  }
+ int conn_manager_t::insert(ip_port_t ip_port)
+ {
+	    assert(!exist(ip_port));
+	    int bucket_size_before=mp.bucket_count();
+	    mp[ip_port.to_u64()]=new conn_info_t;
+		int bucket_size_after=mp.bucket_count();
+		if(bucket_size_after!=bucket_size_before)
+			clear_it=mp.begin();
+		return 0;
+ }
  /*
  int conn_manager_t::exist_fd64(fd64_t fd64)
  {

+ 5 - 11
connection.h

@@ -75,27 +75,21 @@ struct conn_info_t     //stores info for a raw connection.for client ,there is o
 struct conn_manager_t  //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections
 {
 
- u32_t ready_num;
-
-// unordered_map<fd64_t,u64_t> fd64_mp;
  unordered_map<u64_t,conn_info_t*> mp;//<ip,port> to conn_info_t;
- 	 	 	 	 	 	 	 	 	  //put it at end so that it de-consturcts first
-
  unordered_map<u64_t,conn_info_t*>::iterator clear_it;
-
  long long last_clear_time;
 
  conn_manager_t();
- int exist_ip_port(ip_port_t);
- conn_info_t *& find_insert_p(ip_port_t);  //be aware,the adress may change after rehash
- conn_info_t & find_insert(ip_port_t) ; //be aware,the adress may change after rehash
+ int exist(ip_port_t);
+ conn_info_t *& find_p(ip_port_t);  //be aware,the adress may change after rehash
+ conn_info_t & find(ip_port_t) ; //be aware,the adress may change after rehash
+ int insert(ip_port_t);
  /*
  int exist_fd64(fd64_t fd64);
  void insert_fd64(fd64_t fd64,ip_port_t);
  ip_port_t find_by_fd64(fd64_t fd64);*/
 
-
- int erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it);
+int erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it);
 int clear_inactive();
 int clear_inactive0();
 

+ 10 - 5
delay_manager.cpp

@@ -34,14 +34,19 @@ delay_manager_t::~delay_manager_t()
 {
 	//TODO ,we currently dont need to deconstruct it
 }
-/*
+
 int delay_manager_t::get_timer_fd()
 {
-	return delay_timer_fd;
-}*/
-
-int delay_manager_t::add(my_time_t delay,delay_data_t &delay_data)
+	return timer_fd;
+}
+//int add(my_time_t delay,const dest_t &dest,const char *data,int len);
+int delay_manager_t::add(my_time_t delay,const dest_t &dest,char *data,int len)
 {
+	delay_data_t delay_data;
+	delay_data.dest=dest;
+	delay_data.data=data;
+	delay_data.len=len;
+
 	if(capacity!=0&&int(delay_mp.size()) >=capacity)
 	{
 		mylog(log_warn,"max pending packet reached,ignored\n");

+ 2 - 2
delay_manager.h

@@ -43,9 +43,9 @@ struct delay_manager_t
 	multimap<my_time_t,delay_data_t> delay_mp;  //unit us,1 us=0.001ms
 	delay_manager_t();
 	~delay_manager_t();
-	//int get_timer_fd();
+	int get_timer_fd();
 	int check();
-	int add(my_time_t delay,delay_data_t &delay_data);
+	int add(my_time_t delay,const dest_t &dest,char *data,int len);
 };
 
 #endif /* DELAY_MANAGER_H_ */

+ 5 - 6
fd_manager.cpp

@@ -48,12 +48,11 @@ void fd_manager_t::close(fd64_t fd64)
 	close(fd);
 	//return 0;
 }
-void fd_manager_t::reserve()
+void fd_manager_t::reserve(int n)
 {
-	fd_to_fd64_mp.reserve(10007);
-	fd64_to_fd_mp.reserve(10007);
-	fd_info_mp.reserve(10007);
-	//return 0;
+	fd_to_fd64_mp.reserve(n);
+	fd64_to_fd_mp.reserve(n);
+	fd_info_mp.reserve(n);
 }
 u64_t fd_manager_t::create(int fd)
 {
@@ -67,7 +66,7 @@ u64_t fd_manager_t::create(int fd)
 fd_manager_t::fd_manager_t()
 {
 	counter=u32_t(-1);
-	counter+=2;
+	counter+=10;
 }
 fd_info_t & fd_manager_t::get_info(fd64_t fd64)
 {

+ 1 - 1
fd_manager.h

@@ -21,7 +21,7 @@ struct fd_manager_t   //conver fd to a uniq 64bit number,avoid fd value conflict
 	int exist(fd64_t fd64);
 	int to_fd(fd64_t);
 	void close(fd64_t fd64);
-	void reserve();
+	void reserve(int n);
 	u64_t create(int fd);
 	fd_manager_t();
 private:

+ 28 - 0
lib/rs.c

@@ -5,6 +5,8 @@
  *      Author: root
  */
 #include "rs.h"
+#include "stdlib.h"
+#include "string.h"
 
 void rs_encode(void *code,char *data[],int size)
 {
@@ -42,3 +44,29 @@ int rs_decode(void *code,char *data[],int size)
 	}
 	return fec_decode(code,(void**)data,index,size);
 }
+
+static void * (*table)[256]=0;
+void* get_code(int k,int n)
+{
+	if (table==0)
+	{
+		table=(void* (*)[256]) malloc(sizeof(void*)*256*256);
+		memset(table,0,sizeof(void*)*256*256);
+	}
+	if(table[k][n]==0)
+	{
+		table[k][n]=fec_new(k,n);
+	}
+	return table[k][n];
+}
+void rs_encode2(int k,int n,char *data[],int size)
+{
+	void* code=get_code(k,n);
+	rs_encode(code,data,size);
+}
+
+int rs_decode2(int k,int n,char *data[],int size)
+{
+	void* code=get_code(k,n);
+	return rs_decode(code,data,size);
+}

+ 4 - 0
lib/rs.h

@@ -40,6 +40,10 @@ void rs_encode(void *code,char *data[],int size);
 int rs_decode(void *code,char *data[],int size);
 
 
+void rs_encode2(int k,int n,char *data[],int size);
+
+int rs_decode2(int k,int n,char *data[],int size);
+
 
 
 

+ 61 - 14
main.cpp

@@ -100,6 +100,21 @@ int new_connected_socket(int &fd,u32_t ip,int port)
 	}
 	return 0;
 }
+int delay_send(my_time_t delay,const dest_t &dest,char *data,int len)
+{
+	return delay_manager.add(delay,dest,data,len);;
+}
+int from_normal_to_fec(const dest_t &dest,char *data,int len)
+{
+	delay_send(0,dest,data,len);
+	delay_send(1000*1000,dest,data,len);
+	return 0;
+}
+int from_fec_to_normal(const dest_t &dest,char *data,int len)
+{
+	my_send(dest,data,len);
+	return 0;
+}
 int client_event_loop()
 {
 	//char buf[buf_len];
@@ -142,6 +157,14 @@ int client_event_loop()
 		myexit(-1);
 	}
 
+	ev.events = EPOLLIN;
+	ev.data.u64 = delay_manager.get_timer_fd();
+	ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
+	if (ret!= 0) {
+		mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
+		myexit(-1);
+	}
+
 	while(1)////////////////////////
 	{
 		if(about_to_exit) myexit(0);
@@ -208,7 +231,14 @@ int client_event_loop()
 				dest.type=type_fd64_conv;
 				dest.inner.fd64=remote_fd64;
 				dest.conv=conv;
-				my_send(dest,data,data_len);
+				from_normal_to_fec(dest,data,data_len);
+				//my_send(dest,data,data_len);
+			}
+		    else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) {
+				uint64_t value;
+				read(delay_manager.get_timer_fd(), &value, 8);
+				//printf("<timerfd_triggered, %d>",delay_mp.size());
+				//fflush(stdout);
 			}
 			else if(events[idx].data.u64>u32_t(-1) )
 			{
@@ -246,9 +276,11 @@ int client_event_loop()
 				dest_t dest;
 				dest.inner.ip_port.from_u64(u64);
 				dest.type=type_ip_port;
-				my_send(dest,new_data,new_len);
+				from_fec_to_normal(dest,new_data,new_len);
 				mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s());
 			}
+
+
 			/*
 			else if(events[idx].data.u64 ==(u64_t)timer_fd)
 			{
@@ -265,6 +297,7 @@ int client_event_loop()
 				myexit(-1);
 			}
 		}
+		delay_manager.check();
 	}
 	return 0;
 }
@@ -298,6 +331,13 @@ int server_event_loop()
 		myexit(-1);
 	}
 
+	ev.events = EPOLLIN;
+	ev.data.u64 = delay_manager.get_timer_fd();
+	ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
+	if (ret!= 0) {
+		mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
+		myexit(-1);
+	}
 
 	mylog(log_info,"now listening at %s:%d\n",my_ntoa(local_ip_uint32),local_port);
 	while(1)////////////////////////
@@ -352,12 +392,13 @@ int server_event_loop()
 				ip_port_t ip_port;
 				ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
 				ip_port.port=ntohs(udp_new_addr_in.sin_port);
-				if(!conn_manager.exist_ip_port(ip_port))
+				if(!conn_manager.exist(ip_port))
 				{
-					conn_info_t &conn_info=conn_manager.find_insert(ip_port);
+					conn_manager.insert(ip_port);
+					conn_info_t &conn_info=conn_manager.find(ip_port);
 					conn_info.conv_manager.reserve();
 				}
-				conn_info_t &conn_info=conn_manager.find_insert(ip_port);
+				conn_info_t &conn_info=conn_manager.find(ip_port);
 
 				u32_t conv;
 				char *new_data;
@@ -396,8 +437,7 @@ int server_event_loop()
 				dest_t dest;
 				dest.type=type_fd64;
 				dest.inner.fd64=fd64;
-				my_send(dest,new_data,new_len);
-
+				from_fec_to_normal(dest,new_data,new_len);
 
 				//int fd = int((u64 << 32u) >> 32u);
 				//////////////////////////////todo
@@ -437,6 +477,12 @@ int server_event_loop()
 					mylog(log_debug,"(events[idx].data.u64 >>32u) == 2u ,%llu,%llu,%llu  \n",begin_time,end_time,end_time-begin_time);
 				}
 			}*/
+		    else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) {
+				uint64_t value;
+				read(delay_manager.get_timer_fd(), &value, 8);
+				//printf("<timerfd_triggered, %d>",delay_mp.size());
+				//fflush(stdout);
+			}
 			else if (events[idx].data.u64 >u32_t(-1))
 			{
 				char data[buf_len];
@@ -452,11 +498,11 @@ int server_event_loop()
 				assert(fd_manager.exist_info(fd64));
 				ip_port_t ip_port=fd_manager.get_info(fd64).ip_port;
 
-				assert(conn_manager.exist_ip_port(ip_port));
+				assert(conn_manager.exist(ip_port));
 
-				conn_info_t* p_conn_info=conn_manager.find_insert_p(ip_port);
+				//conn_info_t* p_conn_info=conn_manager.find_insert_p(ip_port);
 
-				conn_info_t &conn_info=*p_conn_info;
+				conn_info_t &conn_info=conn_manager.find(ip_port);
 
 				assert(conn_info.conv_manager.is_u64_used(fd64));
 
@@ -484,7 +530,7 @@ int server_event_loop()
 				dest.type=type_ip_port_conv;
 				dest.conv=conv;
 				dest.inner.ip_port=ip_port;
-				my_send(dest,data,data_len);
+				from_normal_to_fec(dest,data,data_len);
 				mylog(log_trace,"[%s] send packet\n",ip_port.to_s());
 
 			}
@@ -495,6 +541,7 @@ int server_event_loop()
 			}
 
 		}
+		delay_manager.check();
 	}
 	return 0;
 }
@@ -514,7 +561,7 @@ int unit_test()
 	{
 		data[i]=arr[i];
 	}
-	rs_encode(code,data,3);
+	rs_encode2(3,6,data,3);
 	//printf("%d %d",(int)(unsigned char)arr[5][0],(int)('a'^'b'^'c'^'d'^'e'));
 
 	for(i=0;i<6;i++)
@@ -526,7 +573,7 @@ int unit_test()
 	//data[1]=0;
 	//data[5]=0;
 
-	int ret=rs_decode(code,data,3);
+	int ret=rs_decode2(3,6,data,3);
 	printf("ret:%d\n",ret);
 
 	for(i=0;i<6;i++)
@@ -891,7 +938,7 @@ int main(int argc, char *argv[])
 
 	local_ip_uint32=inet_addr(local_ip);
 	remote_ip_uint32=inet_addr(remote_ip);
-
+	fd_manager.reserve(10007);
 
 	if(program_mode==client_mode)
 	{

+ 1 - 1
packet.cpp

@@ -234,7 +234,7 @@ int send_fd (int fd,char * buf, int len,int flags)
 }
 //enum delay_type_t {none=0,enum_sendto_u64,enum_send_fd,client_to_local,client_to_remote,server_to_local,server_to_remote};
 
-int my_send(dest_t &dest,char *data,int len)
+int my_send(const dest_t &dest,char *data,int len)
 {
 	switch(dest.type)
 	{

+ 1 - 1
packet.h

@@ -24,7 +24,7 @@ extern int random_drop;
 extern int local_listen_fd;
 
 
-int my_send(dest_t &dest,char *data,int len);
+int my_send(const dest_t &dest,char *data,int len);
 
 void encrypt_0(char * input,int &len,char *key);
 void decrypt_0(char * input,int &len,char *key);