소스 검색

before change to multiplex

wangyu- 8 년 전
부모
커밋
45032e4a95
6개의 변경된 파일403개의 추가작업 그리고 197개의 파일을 삭제
  1. 2 1
      common.h
  2. 16 15
      conn_manager.cpp
  3. 5 5
      conn_manager.h
  4. 366 171
      main.cpp
  5. 11 4
      packet.cpp
  6. 3 1
      packet.h

+ 2 - 1
common.h

@@ -69,7 +69,8 @@ const u32_t conv_clear_interval=200;
 const u32_t timer_interval=400;
 const int conv_clear_ratio=40;
 const int conv_clear_min=5;
-const u32_t conv_timeout=20000;
+////const u32_t conv_timeout=180000;
+const u32_t conv_timeout=40000;//for test
 const int max_conv_num=10000;
 
 /*

+ 16 - 15
conn_manager.cpp

@@ -9,24 +9,24 @@
 
 int disable_conv_clear=0;
 
-conn_manager_t::conn_manager_t() {
+conn_manager_t_not_used::conn_manager_t_not_used() {
 	clear_it = fd_last_active_time.begin();
 	long long last_clear_time = 0;
 	rehash();
 	//clear_function=0;
 }
-conn_manager_t::~conn_manager_t() {
+conn_manager_t_not_used::~conn_manager_t_not_used() {
 	clear();
 }
-int conn_manager_t::get_size() {
+int conn_manager_t_not_used::get_size() {
 	return fd_to_u64.size();
 }
-void conn_manager_t::rehash() {
+void conn_manager_t_not_used::rehash() {
 	u64_to_fd.rehash(10007);
 	fd_to_u64.rehash(10007);
 	fd_last_active_time.rehash(10007);
 }
-void conn_manager_t::clear() {
+void conn_manager_t_not_used::clear() {
 	if (disable_conv_clear)
 		return;
 
@@ -41,28 +41,28 @@ void conn_manager_t::clear() {
 	clear_it = fd_last_active_time.begin();
 
 }
-int conn_manager_t::exist_fd(u32_t fd) {
+int conn_manager_t_not_used::exist_fd(u32_t fd) {
 	return fd_to_u64.find(fd) != fd_to_u64.end();
 }
-int conn_manager_t::exist_u64(u64_t u64) {
+int conn_manager_t_not_used::exist_u64(u64_t u64) {
 	return u64_to_fd.find(u64) != u64_to_fd.end();
 }
-u32_t conn_manager_t::find_fd_by_u64(u64_t u64) {
+u32_t conn_manager_t_not_used::find_fd_by_u64(u64_t u64) {
 	return u64_to_fd[u64];
 }
-u64_t conn_manager_t::find_u64_by_fd(u32_t fd) {
+u64_t conn_manager_t_not_used::find_u64_by_fd(u32_t fd) {
 	return fd_to_u64[fd];
 }
-int conn_manager_t::update_active_time(u32_t fd) {
+int conn_manager_t_not_used::update_active_time(u32_t fd) {
 	return fd_last_active_time[fd] = get_current_time();
 }
-int conn_manager_t::insert_fd(u32_t fd, u64_t u64) {
+int conn_manager_t_not_used::insert_fd(u32_t fd, u64_t u64) {
 	u64_to_fd[u64] = fd;
 	fd_to_u64[fd] = u64;
 	fd_last_active_time[fd] = get_current_time();
 	return 0;
 }
-int conn_manager_t::erase_fd(u32_t fd) {
+int conn_manager_t_not_used::erase_fd(u32_t fd) {
 	if (disable_conv_clear)
 		return 0;
 	u64_t u64 = fd_to_u64[fd];
@@ -81,21 +81,22 @@ int conn_manager_t::erase_fd(u32_t fd) {
 	fd_last_active_time.erase(fd);
 	return 0;
 }
+/*
 void conn_manager_t::check_clear_list() {
 	while (!clear_list.empty()) {
 		int fd = *clear_list.begin();
 		clear_list.pop_front();
 		erase_fd(fd);
 	}
-}
-int conn_manager_t::clear_inactive() {
+}*/
+int conn_manager_t_not_used::clear_inactive() {
 	if (get_current_time() - last_clear_time > conv_clear_interval) {
 		last_clear_time = get_current_time();
 		return clear_inactive0();
 	}
 	return 0;
 }
-int conn_manager_t::clear_inactive0() {
+int conn_manager_t_not_used::clear_inactive0() {
 	if (disable_conv_clear)
 		return 0;
 

+ 5 - 5
conn_manager.h

@@ -13,7 +13,7 @@
 
 extern int disable_conv_clear;
 
-struct conn_manager_t  //TODO change map to unordered map
+struct conn_manager_t_not_used  //TODO change map to unordered map
 {
 	//typedef hash_map map;
 	unordered_map<u64_t,u32_t> u64_to_fd;  //conv and u64 are both supposed to be uniq
@@ -29,9 +29,9 @@ struct conn_manager_t  //TODO change map to unordered map
 	//void (*clear_function)(uint64_t u64) ;
 
 	long long last_clear_time;
-	list<int> clear_list;
-	conn_manager_t();
-	~conn_manager_t();
+	//list<int> clear_list;
+	conn_manager_t_not_used();
+	~conn_manager_t_not_used();
 	int get_size();
 	void rehash();
 	void clear();
@@ -42,7 +42,7 @@ struct conn_manager_t  //TODO change map to unordered map
 	int update_active_time(u32_t fd);
 	int insert_fd(u32_t fd,u64_t u64);
 	int erase_fd(u32_t fd);
-	void check_clear_list();
+	//void check_clear_list();
 	int clear_inactive();
 	int clear_inactive0();
 

+ 366 - 171
main.cpp

@@ -3,7 +3,8 @@
 #include "git_version.h"
 #include "lib/rs.h"
 #include "packet.h"
-#include "conn_manager.h"
+//#include "conn_manager.h"
+#include "delay_manager.h"
 #include "classic.h"
 
 using  namespace std;
@@ -19,187 +20,364 @@ typedef int i32_t;
 int dup_num=1;
 int dup_delay_min=20;   //0.1ms
 int dup_delay_max=20;
-//int dup_first_delay=9000;   //0.1ms
 
 int jitter_min=0;
 int jitter_max=0;
 
-int random_number_fd=-1;
-
-int remote_fd=-1;
-int local_fd=-1;
-
-int local_listen_fd=-1;
-
+//int random_number_fd=-1;
 
 int mtu_warn=1350;
 u32_t remote_address_uint32=0;
-
 char local_address[100], remote_address[100];
 int local_port = -1, remote_port = -1;
-int multi_process_mode=0;
-
-
-
-
-
 
 u64_t last_report_time=0;
 int report_interval=0;
 
+//conn_manager_t conn_manager;
+delay_manager_t delay_manager;
 
+const int disable_conv_clear=0;
+struct conv_manager_t  // manage the udp connections
+{
+	//typedef hash_map map;
+	unordered_map<u64_t,u32_t> u64_to_conv;  //conv and u64 are both supposed to be uniq
+	unordered_map<u32_t,u64_t> conv_to_u64;
 
+	unordered_map<u32_t,u64_t> conv_last_active_time;
 
-conn_manager_t conn_manager;
-
-
-
-int VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV;
+	unordered_map<u32_t,u64_t>::iterator clear_it;
 
+	unordered_map<u32_t,u64_t>::iterator it;
+	unordered_map<u32_t,u64_t>::iterator old_it;
 
+	//void (*clear_function)(uint64_t u64) ;
 
+	long long last_clear_time;
 
+	conv_manager_t()
+	{
+		clear_it=conv_last_active_time.begin();
+		long long last_clear_time=0;
+		//clear_function=0;
+	}
+	~conv_manager_t()
+	{
+		clear();
+	}
+	int get_size()
+	{
+		return conv_to_u64.size();
+	}
+	void reserve()
+	{
+		u64_to_conv.reserve(10007);
+		conv_to_u64.reserve(10007);
+		conv_last_active_time.reserve(10007);
+	}
+	void clear()
+	{
+	/////	if(disable_conv_clear) return ;
 
-struct delay_data
-{
-	int fd;
-	int times_left;
-	char * data;
-	int len;
-	u64_t u64;
-};
-int delay_timer_fd;
-
+		if(program_mode==server_mode)
+		{
+			for(it=conv_to_u64.begin();it!=conv_to_u64.end();it++)
+			{
+				//int fd=int((it->second<<32u)>>32u);
+	//////			server_clear_function(  it->second);//////////////todo
+			}
+		}
+		u64_to_conv.clear();
+		conv_to_u64.clear();
+		conv_last_active_time.clear();
 
-multimap<my_time_t,delay_data> delay_mp;
+		clear_it=conv_last_active_time.begin();
 
-int add_to_delay_mp(int fd,int times_left,u32_t delay,char * buf,int len,u64_t u64)
-{
-	if(max_pending_packet!=0&&int(delay_mp.size()) >=max_pending_packet)
+	}
+	u32_t get_new_conv()
+	{
+		u32_t conv=get_true_random_number_nz();
+		while(conv_to_u64.find(conv)!=conv_to_u64.end())
+		{
+			conv=get_true_random_number_nz();
+		}
+		return conv;
+	}
+	int is_conv_used(u32_t conv)
 	{
-		mylog(log_warn,"max pending packet reached,ignored\n");
+		return conv_to_u64.find(conv)!=conv_to_u64.end();
+	}
+	int is_u64_used(u64_t u64)
+	{
+		return u64_to_conv.find(u64)!=u64_to_conv.end();
+	}
+	u32_t find_conv_by_u64(u64_t u64)
+	{
+		return u64_to_conv[u64];
+	}
+	u64_t find_u64_by_conv(u32_t conv)
+	{
+		return conv_to_u64[conv];
+	}
+	int update_active_time(u32_t conv)
+	{
+		return conv_last_active_time[conv]=get_current_time();
+	}
+	int insert_conv(u32_t conv,u64_t u64)
+	{
+		u64_to_conv[u64]=conv;
+		conv_to_u64[conv]=u64;
+		conv_last_active_time[conv]=get_current_time();
 		return 0;
 	}
-	delay_data tmp;
-	tmp.data = buf;
-	tmp.fd = fd;
-	tmp.times_left = times_left;
-	tmp.len = len;
-	tmp.u64=u64;
-	my_time_t tmp_time=get_current_time_us();
-	tmp_time+=delay*100;
-	delay_mp.insert(make_pair(tmp_time,tmp));
-
-	return 0;
-}
-int add_and_new(int fd,int times_left,u32_t delay,char * buf,int len,u64_t u64)
-{
-	if(times_left<=0) return -1;
-
-	char * str= (char *)malloc(len);
-	memcpy(str,buf,len);
-	add_to_delay_mp(fd,times_left,delay,str,len,u64);
-	return 0;
-}
-
-multimap<u64_t,delay_data> new_delay_mp;
-
-
-
-void handler(int num) {
-	int status;
-	int pid;
-	while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
-		if (WIFEXITED(status)) {
-			//printf("The child exit with code %d",WEXITSTATUS(status));
+	int erase_conv(u32_t conv)
+	{
+		if(disable_conv_clear) return 0;
+		u64_t u64=conv_to_u64[conv];
+		if(program_mode==server_mode)
+		{
+			//server_clear_function(u64);
+		}
+		conv_to_u64.erase(conv);
+		u64_to_conv.erase(u64);
+		conv_last_active_time.erase(conv);
+		return 0;
+	}
+	int clear_inactive(char * ip_port=0)
+	{
+		if(get_current_time()-last_clear_time>conv_clear_interval)
+		{
+			last_clear_time=get_current_time();
+			return clear_inactive0(ip_port);
 		}
+		return 0;
 	}
+	int clear_inactive0(char * ip_port)
+	{
+		if(disable_conv_clear) return 0;
 
-}
 
-void check_delay_map()
-{
-	if(!delay_mp.empty())
-	{
-		my_time_t current_time;
+		//map<uint32_t,uint64_t>::iterator it;
+		int cnt=0;
+		it=clear_it;
+		int size=conv_last_active_time.size();
+		int num_to_clean=size/conv_clear_ratio+conv_clear_min;   //clear 1/10 each time,to avoid latency glitch
+
+		num_to_clean=min(num_to_clean,size);
 
-		multimap<my_time_t,delay_data>::iterator it;
-		while(1)
+		u64_t current_time=get_current_time();
+		for(;;)
 		{
-			int ret=0;
-			it=delay_mp.begin();
-			if(it==delay_mp.end()) break;
+			if(cnt>=num_to_clean) break;
+			if(conv_last_active_time.begin()==conv_last_active_time.end()) break;
 
-			current_time=get_current_time_us();
-			if(it->first < current_time||it->first ==current_time)
+			if(it==conv_last_active_time.end())
 			{
-				if (is_client) {
-					if (conn_manager.exist_fd(it->second.fd)) {
-						u64_t u64 = conn_manager.find_u64_by_fd(it->second.fd);
-						if (u64 != it->second.u64) {
-							it->second.times_left = 0; //fd has been deleted and recreated
-							// 偷懒的做法
-						} else {
-							char new_data[buf_len];
-							int new_len = 0;
-							do_obscure(it->second.data, it->second.len,
-									new_data, new_len);
-							ret = send_fd(it->second.fd, new_data, new_len, 0);
-						}
-					} else {
-						it->second.times_left = 0;
-					}
-				} else {
-
-					if (conn_manager.exist_fd(it->second.fd)) {
-						u64_t u64 = conn_manager.find_u64_by_fd(it->second.fd);
-						if (u64 != it->second.u64) {
-							it->second.times_left = 0;//fd has been deleted and recreated
-							// 偷懒的做法
-						} else {
-							char new_data[buf_len];
-							int new_len = 0;
-							do_obscure(it->second.data, it->second.len,
-									new_data, new_len);
-							sendto_u64(local_listen_fd, new_data, new_len, 0,
-									u64);
-						}
-					} else {
-						it->second.times_left = 0;
-					}
-				}
-				if (ret < 0) {
-					mylog(log_debug, "send return %d at @300", ret);
-				}
-
+				it=conv_last_active_time.begin();
+			}
 
-				if(it->second.times_left>1)
+			if( current_time -it->second  >conv_timeout )
+			{
+				//mylog(log_info,"inactive conv %u cleared \n",it->first);
+				old_it=it;
+				it++;
+				u32_t conv= old_it->first;
+				erase_conv(old_it->first);
+				if(ip_port==0)
 				{
-					//delay_mp.insert(pair<my_time,delay_data>(current_time));
-					add_to_delay_mp(it->second.fd,it->second.times_left-1,random_between(dup_delay_min,dup_delay_max),it->second.data,it->second.len,it->second.u64);
+					mylog(log_info,"conv %x cleared\n",conv);
 				}
 				else
 				{
-					free(it->second.data);
+					mylog(log_info,"[%s]conv %x cleared\n",ip_port,conv);
 				}
-				delay_mp.erase(it);
 			}
 			else
 			{
-				break;
+				it++;
 			}
+			cnt++;
+		}
+		return 0;
+	}
+};//g_conv_manager;
+
+struct conn_info_t     //stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can
+//handle multiple clients
+{
+	conv_manager_t conv_manager;
+};
+struct conn_manager_t  //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections
+{
 
+ u32_t ready_num;
+
+ unordered_map<int,conn_info_t *> udp_fd_mp;  //a bit dirty to used pointer,but can void unordered_map search
+ unordered_map<int,conn_info_t *> timer_fd_mp;//we can use pointer here since unordered_map.rehash() uses shallow copy
+
+ unordered_map<id_t,conn_info_t *> const_id_mp;
+
+ unordered_map<u64_t,conn_info_t*> mp; //put it at end so that it de-consturcts first
+
+ unordered_map<u64_t,conn_info_t*>::iterator clear_it;
+
+ long long last_clear_time;
+
+ conn_manager_t()
+ {
+	 ready_num=0;
+	 mp.reserve(10007);
+	 clear_it=mp.begin();
+	 timer_fd_mp.reserve(10007);
+	 const_id_mp.reserve(10007);
+	 udp_fd_mp.reserve(100007);
+	 last_clear_time=0;
+	 //current_ready_ip=0;
+	// current_ready_port=0;
+ }
+ int exist(u32_t ip,uint16_t port)
+ {
+	 u64_t u64=0;
+	 u64=ip;
+	 u64<<=32u;
+	 u64|=port;
+	 if(mp.find(u64)!=mp.end())
+	 {
+		 return 1;
+	 }
+	 return 0;
+ }
+ /*
+ int insert(uint32_t ip,uint16_t port)
+ {
+	 uint64_t u64=0;
+	 u64=ip;
+	 u64<<=32u;
+	 u64|=port;
+	 mp[u64];
+	 return 0;
+ }*/
+ conn_info_t *& find_insert_p(u32_t ip,uint16_t port)  //be aware,the adress may change after rehash
+ {
+	 u64_t u64=0;
+	 u64=ip;
+	 u64<<=32u;
+	 u64|=port;
+	 unordered_map<u64_t,conn_info_t*>::iterator it=mp.find(u64);
+	 if(it==mp.end())
+	 {
+		 mp[u64]=new conn_info_t;
+	 }
+	 return mp[u64];
+ }
+ conn_info_t & find_insert(u32_t ip,uint16_t port)  //be aware,the adress may change after rehash
+ {
+	 u64_t u64=0;
+	 u64=ip;
+	 u64<<=32u;
+	 u64|=port;
+	 unordered_map<u64_t,conn_info_t*>::iterator it=mp.find(u64);
+	 if(it==mp.end())
+	 {
+		 mp[u64]=new conn_info_t;
+	 }
+	 return *mp[u64];
+ }
+ int erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it)
+ {
+		if(erase_it->second->state.server_current_state==server_ready)
+		{
+			ready_num--;
+			assert(i32_t(ready_num)!=-1);
+			assert(erase_it->second!=0);
+			assert(erase_it->second->timer_fd !=0);
+			assert(erase_it->second->oppsite_const_id!=0);
+			assert(const_id_mp.find(erase_it->second->oppsite_const_id)!=const_id_mp.end());
+			assert(timer_fd_mp.find(erase_it->second->timer_fd)!=timer_fd_mp.end());
+
+			const_id_mp.erase(erase_it->second->oppsite_const_id);
+			timer_fd_mp.erase(erase_it->second->timer_fd);
+			close(erase_it->second->timer_fd);// close will auto delte it from epoll
+			delete(erase_it->second);
+			mp.erase(erase_it->first);
 		}
-		if(!delay_mp.empty())
+		else
 		{
-			itimerspec its;
-			memset(&its.it_interval,0,sizeof(its.it_interval));
-			its.it_value.tv_sec=delay_mp.begin()->first/1000000llu;
-			its.it_value.tv_nsec=(delay_mp.begin()->first%1000000llu)*1000llu;
-			timerfd_settime(delay_timer_fd,TFD_TIMER_ABSTIME,&its,0);
+			assert(erase_it->second->blob==0);
+			assert(erase_it->second->timer_fd ==0);
+			assert(erase_it->second->oppsite_const_id==0);
+			delete(erase_it->second);
+			mp.erase(erase_it->first);
 		}
+		return 0;
+ }
+int clear_inactive()
+{
+	if(get_current_time()-last_clear_time>conn_clear_interval)
+	{
+		last_clear_time=get_current_time();
+		return clear_inactive0();
 	}
+	return 0;
 }
+int clear_inactive0()
+{
+	 unordered_map<u64_t,conn_info_t*>::iterator it;
+	 unordered_map<u64_t,conn_info_t*>::iterator old_it;
+
+	if(disable_conn_clear) return 0;
+
+	//map<uint32_t,uint64_t>::iterator it;
+	int cnt=0;
+	it=clear_it;
+	int size=mp.size();
+	int num_to_clean=size/conn_clear_ratio+conn_clear_min;   //clear 1/10 each time,to avoid latency glitch
+
+	mylog(log_trace,"mp.size() %d\n", size);
+
+	num_to_clean=min(num_to_clean,(int)mp.size());
+	u64_t current_time=get_current_time();
+
+	for(;;)
+	{
+		if(cnt>=num_to_clean) break;
+		if(mp.begin()==mp.end()) break;
+
+		if(it==mp.end())
+		{
+			it=mp.begin();
+		}
+
+		if(it->second->state.server_current_state==server_ready &&current_time - it->second->last_hb_recv_time  <=server_conn_timeout)
+		{
+				it++;
+		}
+		else if(it->second->state.server_current_state!=server_ready&& current_time - it->second->last_state_time  <=server_handshake_timeout )
+		{
+			it++;
+		}
+		else if(it->second->blob!=0&&it->second->blob->conv_manager.get_size() >0)
+		{
+			assert(it->second->state.server_current_state==server_ready);
+			it++;
+		}
+		else
+		{
+			mylog(log_info,"[%s:%d]inactive conn cleared \n",my_ntoa(it->second->raw_info.recv_info.src_ip),it->second->raw_info.recv_info.src_port);
+			old_it=it;
+			it++;
+			erase(old_it);
+		}
+		cnt++;
+	}
+	return 0;
+}
+
+}conn_manager;
+
+int VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV;
+
+
 
 int event_loop()
 {
@@ -245,19 +423,12 @@ int event_loop()
 
 
 
-	if ((delay_timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
-	{
-		mylog(log_fatal,"timer_fd create error");
-		myexit(1);
-	}
 	ev.events = EPOLLIN;
-	ev.data.fd = delay_timer_fd;
+	ev.data.fd = delay_manager.timer_fd();
+
+	epoll_ctl(epollfd, EPOLL_CTL_ADD, delay_manager.timer_fd, &ev);
 
-	itimerspec zero_its;
-	memset(&zero_its, 0, sizeof(zero_its));
 
-	timerfd_settime(delay_timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
-	epoll_ctl(epollfd, EPOLL_CTL_ADD, delay_timer_fd, &ev);
 	if (ret < 0)
 	{
 		mylog(log_fatal,"epoll_ctl return %d\n", ret);
@@ -329,12 +500,33 @@ int event_loop()
 					mylog(log_info,"new connection from %s:%d ,created new udp fd %d\n",my_ntoa(local_other.sin_addr.s_addr),ntohs(local_other.sin_port),new_udp_fd);
 					conn_manager.insert_fd(new_udp_fd,u64);
 				}
+
 				int new_udp_fd=conn_manager.find_fd_by_u64(u64);
 				conn_manager.update_active_time(new_udp_fd);
 				int ret;
 				if(is_client)
 				{
 					add_seq(data,data_len);
+					my_time_t sum=0;
+
+					for(int i=0;i<dup_num;i++)
+					{
+						printf("<%d>\n",i);
+						char new_data[buf_len];
+						int new_len=0;
+						do_obscure(data, data_len, new_data, new_len);
+						delay_data_t tmp;
+						tmp.type=enum_send_fd;
+						tmp.data=new_data;
+						tmp.len=new_len;
+						tmp.dest.fd=new_udp_fd;
+						if(i==0)
+							sum+=random_between(jitter_min,jitter_max)*100;
+						else
+							sum+=random_between(dup_delay_min,dup_delay_max)*100;
+						delay_manager.add(sum,tmp);
+					}
+					/*
 					if(jitter_max==0)
 					{
 						char new_data[buf_len];
@@ -349,20 +541,23 @@ int event_loop()
 					else
 					{
 						add_and_new(new_udp_fd, dup_num,random_between(jitter_min,jitter_max), data, data_len,u64);
-					}
+					}*/
 					packet_send_count++;
 				}
 				else
 				{
+					printf("i got a packet\n");
 					char new_data[buf_len];
 					int new_len;
 
 					if (de_obscure(data, data_len, new_data, new_len) != 0) {
+						printf("failed 1\n");
 						mylog(log_trace,"de_obscure failed \n");
 						continue;
 					}
 					//dup_packet_recv_count++;
 					if (remove_seq(new_data, new_len) != 0) {
+						printf("failed 2\n");
 						mylog(log_trace,"remove_seq failed \n");
 						continue;
 					}
@@ -390,10 +585,10 @@ int event_loop()
 								dup_packet_send_count);
 				}
 			}
-			else if (events[n].data.fd == delay_timer_fd)
+			else if (events[n].data.fd == delay_manager.timer_fd)
 			{
 				uint64_t value;
-				read(delay_timer_fd, &value, 8);
+				read(delay_manager.timer_fd, &value, 8);
 				//printf("<timerfd_triggered, %d>",delay_mp.size());
 				//fflush(stdout);
 			}
@@ -442,8 +637,8 @@ int event_loop()
 						continue;
 					}
 					//packet_recv_count++;
-					ret = sendto_u64(local_listen_fd, new_data,
-							new_len , 0,u64);
+					ret = sendto_u64(u64, new_data,
+							new_len , 0);
 					if (ret < 0) {
 						mylog(log_warn, "sento returned %d,%s\n", ret,strerror(errno));
 						//perror("ret<0");
@@ -453,22 +648,23 @@ int event_loop()
 				{
 					add_seq(data,data_len);
 
-					if(jitter_max==0)
+					my_time_t sum=0;
+					for(int i=0;i<dup_num;i++)
 					{
+						printf("<%d>\n",i);
 						char new_data[buf_len];
 						int new_len=0;
 						do_obscure(data, data_len, new_data, new_len);
-						ret = sendto_u64(local_listen_fd, new_data,
-								new_len , 0,u64);
-							add_and_new(udp_fd, dup_num - 1,random_between(dup_delay_min,dup_delay_max), data, data_len,u64);
-						if (ret < 0) {
-							mylog(log_warn, "sento returned %d,%s\n", ret,strerror(errno));
-							//perror("ret<0");
-						}
-					}
-					else
-					{
-							add_and_new(udp_fd, dup_num,random_between(jitter_min,jitter_max), data, data_len,u64);
+						delay_data_t tmp;
+						tmp.type=enum_sendto_u64;
+						tmp.data=new_data;
+						tmp.len=new_len;
+						tmp.dest.u64=u64;
+						if(i==0)
+							sum+=random_between(jitter_min,jitter_max)*100;
+						else
+							sum+=random_between(dup_delay_min,dup_delay_max)*100;
+						delay_manager.add(sum,tmp);
 					}
 					packet_send_count++;
 
@@ -481,8 +677,8 @@ int event_loop()
 
 			}
 		}
-		check_delay_map();
-		conn_manager.check_clear_list();
+		delay_manager.check();
+		//conn_manager.check_clear_list();
 		if(clear_triggered)   // 删除操作在epoll event的最后进行,防止event cache中的fd失效。
 		{
 			u64_t value;
@@ -653,7 +849,7 @@ void process_arg(int argc, char *argv[])
 		switch (opt)
 		{
 		case 'p':
-			multi_process_mode=1;
+			//multi_process_mode=1;
 			break;
 		case 'k':
 			sscanf(optarg,"%s\n",key_string);
@@ -856,10 +1052,13 @@ int main(int argc, char *argv[])
 		printf("this_program fec\n");
 		return 0;
 	}
-	if(argc==2&&strcmp(argv[1],"fec")!=0)
+	/*
+	if(argc>=2&&strcmp(argv[1],"fec")!=0)
 	{
+		printf("running into classic mode!\n");
 		return classic::main(argc,argv);
-	}
+	}*/
+
 	assert(sizeof(u64_t)==8);
 	assert(sizeof(i64_t)==8);
 	assert(sizeof(u32_t)==4);
@@ -867,17 +1066,13 @@ int main(int argc, char *argv[])
 	dup2(1, 2);		//redirect stderr to stdout
 	int i, j, k;
 	process_arg(argc,argv);
+	delay_manager.capacity=max_pending_packet;
 	init_random_number_fd();
 
 	remote_address_uint32=inet_addr(remote_address);
 
-	if(!multi_process_mode)
-	{
-		event_loop();
-	}
-	else
-	{
-	}
+
+	event_loop();
 
 
 	return 0;

+ 11 - 4
packet.cpp

@@ -23,6 +23,8 @@ int random_drop=0;
 
 char key_string[1000]= "secret key";
 
+int local_listen_fd=-1;
+
 struct anti_replay_t
 {
 	u64_t max_packet_received;
@@ -192,9 +194,10 @@ int de_obscure(const char * input, int in_len,char *output,int &out_len)
 }
 
 
-int sendto_u64 (int fd,char * buf, int len,int flags, u64_t u64)
+int sendto_fd_u64 (int fd,u64_t u64,char * buf, int len,int flags)
 {
 
+	/*
 	if(is_server)
 	{
 		dup_packet_send_count++;
@@ -205,7 +208,7 @@ int sendto_u64 (int fd,char * buf, int len,int flags, u64_t u64)
 		{
 			return 0;
 		}
-	}
+	}*/
 
 	sockaddr_in tmp_sockaddr;
 
@@ -220,9 +223,13 @@ int sendto_u64 (int fd,char * buf, int len,int flags, u64_t u64)
 			(struct sockaddr *) &tmp_sockaddr,
 			sizeof(tmp_sockaddr));
 }
-
+int sendto_u64 (u64_t u64,char * buf, int len,int flags)
+{
+	return sendto_fd_u64(local_listen_fd,u64,buf,len,flags);
+}
 int send_fd (int fd,char * buf, int len,int flags)
 {
+	/*
 	if(is_client)
 	{
 		dup_packet_send_count++;
@@ -233,7 +240,7 @@ int send_fd (int fd,char * buf, int len,int flags)
 		{
 			return 0;
 		}
-	}
+	}*/
 	return send(fd,buf,len,flags);
 }
 

+ 3 - 1
packet.h

@@ -20,6 +20,7 @@ extern u64_t dup_packet_recv_count;
 extern char key_string[1000];
 extern int disable_replay_filter;
 extern int random_drop;
+extern int local_listen_fd;
 
 void encrypt_0(char * input,int &len,char *key);
 void decrypt_0(char * input,int &len,char *key);
@@ -28,7 +29,8 @@ int remove_seq(char * data,int &data_len);
 int do_obscure(const char * input, int in_len,char *output,int &out_len);
 int de_obscure(const char * input, int in_len,char *output,int &out_len);
 
-int sendto_u64 (int fd,char * buf, int len,int flags, u64_t u64);
+int sendto_fd_u64 (int fd,u64_t u64,char * buf, int len,int flags);
+int sendto_u64 (u64_t u64,char * buf, int len,int flags);
 int send_fd (int fd,char * buf, int len,int flags);
 
 #endif /* PACKET_H_ */