wangyu- %!s(int64=8) %!d(string=hai) anos
pai
achega
479e60883c
Modificáronse 4 ficheiros con 243 adicións e 92 borrados
  1. 33 13
      fec_manager.cpp
  2. 20 7
      fec_manager.h
  3. 165 72
      main.cpp
  4. 25 0
      packet.cpp

+ 33 - 13
fec_manager.cpp

@@ -9,7 +9,7 @@
 #include "log.h"
 #include "common.h"
 #include "lib/rs.h"
-
+#include "fd_manager.h"
 
 blob_encode_t::blob_encode_t()
 {
@@ -38,10 +38,10 @@ int blob_encode_t::get_shard_len(int n,int next_packet_len)
 
 int blob_encode_t::input(char *s,int len)
 {
-	assert(current_len+len+sizeof(u16_t) <=256*buf_len);
+	assert(current_len+len+sizeof(u16_t) <=max_fec_packet_num*buf_len);
 	assert(len<=65535&&len>=0);
 	counter++;
-	assert(counter<=max_packet_num);
+	assert(counter<=max_normal_packet_num);
 	write_u16(buf+current_len,len);
 	current_len+=sizeof(u16_t);
 	memcpy(buf+current_len,s,len);
@@ -51,7 +51,6 @@ int blob_encode_t::input(char *s,int len)
 
 int blob_encode_t::output(int n,char ** &s_arr,int & len)
 {
-	static char *output_arr[256+100];
 	len=round_up_div(current_len,n);
 	write_u32(buf,counter);
 	for(int i=0;i<n;i++)
@@ -79,7 +78,7 @@ int blob_decode_t::input(char *s,int len)
 		assert(last_len==len);
 	}
 	counter++;
-	assert(counter<=256);
+	assert(counter<=max_fec_packet_num);
 	last_len=len;
 	assert(current_len+len+100<(int)sizeof(buf));
 	memcpy(buf+current_len,s,len);
@@ -88,15 +87,13 @@ int blob_decode_t::input(char *s,int len)
 }
 int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr)
 {
-	static char *s_buf[max_packet_num+100];
-	static int len_buf[max_packet_num+100];
 
 	int parser_pos=0;
 
 	if(parser_pos+(int)sizeof(u32_t)>current_len) return -1;
 
 	n=(int)read_u32(buf+parser_pos);
-	if(n>max_packet_num) {mylog(log_info,"failed 1\n");return -1;}
+	if(n>max_normal_packet_num) {mylog(log_info,"failed 1\n");return -1;}
 	s_arr=s_buf;
 	len_arr=len_buf;
 
@@ -115,23 +112,46 @@ int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr)
 
 fec_encode_manager_t::fec_encode_manager_t()
 {
-	re_init(4,2,1200);
+	//int timer_fd;
+	if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
+	{
+		mylog(log_fatal,"timer_fd create error");
+		myexit(1);
+	}
+	timer_fd64=fd_manager.create(timer_fd);
+
+	re_init(4,2,1200,100,10000);
+}
+fec_encode_manager_t::~fec_encode_manager_t()
+{
+	fd_manager.close(timer_fd64);
 }
-int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu)
+u64_t fec_encode_manager_t::get_timer_fd64()
+{
+	return timer_fd64;
+}
+int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time)
 {
 	fec_data_num=data_num;
 	fec_redundant_num=redundant_num;
 	fec_mtu=mtu;
+	fec_pending_num=pending_num;
 
 	counter=0;
 	blob_encode.clear();
 	ready_for_output=0;
 	seq=0;
+
+	itimerspec zero_its;
+	memset(&zero_its, 0, sizeof(zero_its));
+
+	timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
+
 	return 0;
 }
 int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
 {
-    if(s==0 ||blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu)
+    if(s==0 ||blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu||counter>=fec_pending_num)
 	{
     	char ** blob_output;
     	int blob_len;
@@ -226,7 +246,7 @@ int fec_decode_manager_t::input(char *s,int len)
 	{
 		return -1;
 	}
-	if(data_num+redundant_num>255)
+	if(data_num+redundant_num>max_fec_packet_num)
 	{
 		return -1;
 	}
@@ -277,7 +297,7 @@ int fec_decode_manager_t::input(char *s,int len)
 	if((int)inner_mp.size()==data_num)
 	{
 
-		char *fec_tmp_arr[256+5]={0};
+		char *fec_tmp_arr[max_fec_packet_num+5]={0};
 		for(auto it=inner_mp.begin();it!=inner_mp.end();it++)
 		{
 			fec_tmp_arr[it->first]=fec_data[it->second].buf;

+ 20 - 7
fec_manager.h

@@ -12,8 +12,8 @@
 #include "log.h"
 #include "lib/rs.h"
 
-const int max_packet_num=1000;
-
+const int max_normal_packet_num=1000;
+const int max_fec_packet_num=255;
 const u32_t anti_replay_buff_size=30000;
 const u32_t fec_buff_size=3000;
 
@@ -58,10 +58,13 @@ struct anti_replay_t
 
 struct blob_encode_t
 {
-	char buf[(256+5)*buf_len];
+	char buf[(max_fec_packet_num+5)*buf_len];
 	int current_len;
 	int counter;
 
+
+	char *output_arr[max_fec_packet_num+100];
+
 	blob_encode_t();
 
     int clear();
@@ -76,11 +79,14 @@ struct blob_encode_t
 
 struct blob_decode_t
 {
-	char buf[(256+5)*buf_len];
+	char buf[(max_fec_packet_num+5)*buf_len];
 	int current_len;
 	int last_len;
 	int counter;
 
+	char *s_buf[max_normal_packet_num+100];
+	int len_buf[max_normal_packet_num+100];
+
 	blob_decode_t();
 	int clear();
 	int input(char *input,int len);
@@ -91,17 +97,23 @@ class fec_encode_manager_t
 {
 	int fec_data_num,fec_redundant_num;
 	int fec_mtu;
-	char buf[256+5][buf_len+100];
-	char *output_buf[256+5];
+	int fec_pending_num;
+	int fec_pending_time;
+	char buf[max_fec_packet_num+5][buf_len+100];
+	char *output_buf[max_fec_packet_num+5];
 	int output_len;
 	int ready_for_output;
 	u32_t seq;
 	int counter;
+	int timer_fd;
+	u64_t timer_fd64;
 
 	blob_encode_t blob_encode;
 public:
 	fec_encode_manager_t();
-	int re_init(int data_num,int redundant_num,int mtu);
+
+	u64_t get_timer_fd64();
+	int re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time);
 	int input(char *s,int len/*,int &is_first_packet*/);
 	int output(int &n,char ** &s_arr,int &len);
 };
@@ -124,6 +136,7 @@ class fec_decode_manager_t
 	unordered_map<u32_t, map<int,int> > mp;
 	blob_decode_t blob_decode;
 
+
 	int output_n;
 	char ** output_s_arr;
 	int * output_len_arr;

+ 165 - 72
main.cpp

@@ -30,7 +30,8 @@ int mtu_warn=1350;
 int fec_data_num=3;
 int fec_redundant_num=2;
 int fec_mtu=30;
-
+int fec_pending_num=5;
+int fec_pending_time=10000;
 u32_t local_ip_uint32,remote_ip_uint32=0;
 char local_ip[100], remote_ip[100];
 int local_port = -1, remote_port = -1;
@@ -110,44 +111,95 @@ int delay_send(my_time_t delay,const dest_t &dest,char *data,int len)
 {
 	return delay_manager.add(delay,dest,data,len);;
 }
-int from_normal_to_fec(conn_info_t & conn_info,const dest_t &dest,char *data,int len)
+int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,int *&out_delay)
 {
+	static int out_delay_buf[max_fec_packet_num+100]={0};
+	static int out_len_buf[max_fec_packet_num+100]={0};
 	static int counter=0;
+	out_delay=out_delay_buf;
+	out_len=out_len_buf;
+
+	if(0)
+	{
+		if(data==0) return 0;
+		out_n=1;
+		static char *data_static;
+		data_static=data;
+		static int len_static;
+		len_static=len;
+		out_arr=&data_static;
+		out_len=&len_static;
+	}
+	else
+	{
 	counter++;
 
 	conn_info.fec_encode_manager.input(data,len);
+
 	//if(counter%5==0)
 		//conn_info.fec_encode_manager.input(0,0);
 
-	int n;
-	char **s_arr;
-	int s_arr_len;
+	//int n;
+	//char **s_arr;
+	//int s_len;
 
-	conn_info.fec_encode_manager.output(n,s_arr,s_arr_len);
+	int tmp_out_len;
+	conn_info.fec_encode_manager.output(out_n,out_arr,tmp_out_len);
 
-	for(int i=0;i<n;i++)
+	for(int i=0;i<out_n;i++)
 	{
-		delay_send(0,dest,s_arr[i],s_arr_len);
+		out_len_buf[i]=tmp_out_len;
+		out_delay_buf[i]=100000*i;
 	}
+
+	}
+
+	//for(int i=0;i<n;i++)
+	//{
+		//delay_send(0,dest,s_arr[i],s_len);
+	//}
 	//delay_send(0,dest,data,len);
 	//delay_send(1000*1000,dest,data,len);
 	return 0;
 }
-int from_fec_to_normal(conn_info_t & conn_info,const dest_t &dest,char *data,int len)
+int from_fec_to_normal(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,int *&out_delay)
 {
+	static int out_delay_buf[max_normal_packet_num+100]={0};
+	out_delay=out_delay_buf;
+	if(0)
+	{
+		if(data==0) return 0;
+		out_n=1;
+		static char *data_static;
+		data_static=data;
+		static int len_static;
+		len_static=len;
+		out_arr=&data_static;
+		out_len=&len_static;
+		//out_len_buf[0]=len;
+	}
+	else
+	{
+
 	conn_info.fec_decode_manager.input(data,len);
 
-	int n;char ** s_arr;int* len_arr;
-	conn_info.fec_decode_manager.output(n,s_arr,len_arr);
+	//int n;char ** s_arr;int* len_arr;
+	conn_info.fec_decode_manager.output(out_n,out_arr,out_len);
+	for(int i=0;i<out_n;i++)
+	{
+		out_delay_buf[i]=100000*i;
+	}
 
+	}
 
 //	printf("<n:%d>",n);
+	/*
 	for(int i=0;i<n;i++)
 	{
 		delay_send(0,dest,s_arr[i],len_arr[i]);
 		//s_arr[i][len_arr[i]]=0;
 		//printf("<%s>\n",s_arr[i]);
-	}
+	}*/
 	//my_send(dest,data,len);
 	return 0;
 }
@@ -164,7 +216,7 @@ int client_event_loop()
     conn_info_t &conn_info=*conn_info_p;  //huge size of conn_info,do not allocate on stack
 	init_listen_socket();
 
-	conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu);
+	conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time);
 
 	epoll_fd = epoll_create1(0);
 
@@ -203,6 +255,11 @@ int client_event_loop()
 		myexit(-1);
 	}
 
+	u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64();
+	ev.events = EPOLLIN;
+	ev.data.u64 = fd64;
+	ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev);
+
 	while(1)////////////////////////
 	{
 		if(about_to_exit) myexit(0);
@@ -266,14 +323,20 @@ int client_event_loop()
 
 
 				dest_t dest;
-				dest.type=type_fd64_conv;
+				dest.type=type_fd64;
 				dest.inner.fd64=remote_fd64;
 
-				//char * new_data;
-				//int new_len;
-				//put_conv(conv,data,data_len,new_data,new_len);
-				dest.conv=conv;
-				from_normal_to_fec(conn_info,dest,data,data_len);
+				char * new_data;
+				int new_len;
+				put_conv(conv,data,data_len,new_data,new_len);
+
+				int  out_n;char **out_arr;int *out_len;int *out_delay;
+				//dest.conv=conv;
+				from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay);
+				for(int i=0;i<out_n;i++)
+				{
+					delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
+				}
 				//my_send(dest,data,data_len);
 			}
 		    else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) {
@@ -308,18 +371,28 @@ int client_event_loop()
 				{
 					mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
 				}
-				u32_t conv;
-				char *new_data;
-				int new_len;
-				if(get_conv(conv,data,data_len,new_data,new_len)!=0)
-					continue;
-				if(!conn_info.conv_manager.is_conv_used(conv))continue;
-				u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv);
-				dest_t dest;
-				dest.inner.ip_port.from_u64(u64);
-				dest.type=type_ip_port;
-				from_fec_to_normal(conn_info,dest,new_data,new_len);
-				mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s());
+
+
+				int  out_n;char **out_arr;int *out_len;int *out_delay;
+				from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay);
+
+				for(int i=0;i<out_n;i++)
+				{
+					u32_t conv;
+					char *new_data;
+					int new_len;
+					if(get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0)
+						continue;
+					if(!conn_info.conv_manager.is_conv_used(conv))continue;
+					u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv);
+					dest_t dest;
+					dest.inner.ip_port.from_u64(u64);
+					dest.type=type_ip_port;
+					//dest.conv=conv;
+
+					delay_send(out_delay[i],dest,new_data,new_len);
+				}
+				//mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s());
 			}
 
 
@@ -438,51 +511,64 @@ int server_event_loop()
 				{
 					conn_manager.insert(ip_port);
 					conn_info_t &conn_info=conn_manager.find(ip_port);
-					conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu);
+					conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time);
 					conn_info.conv_manager.reserve();
+
+					u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64();
+					ev.events = EPOLLIN;
+					ev.data.u64 = fd64;
+					ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev);
+
+					fd_manager.get_info(fd64).ip_port=ip_port;
 				}
 				conn_info_t &conn_info=conn_manager.find(ip_port);
 
+				int  out_n;char **out_arr;int *out_len;int *out_delay;
+				from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay);
 
-				u32_t conv;
-				char *new_data;
-				int new_len;
-				if(get_conv(conv,data,data_len,new_data,new_len)!=0)
-					continue;
+				for(int i=0;i<out_n;i++)
+				{
+					u32_t conv;
+					char *new_data;
+					int new_len;
+					if(get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0)
+						continue;
 
-				/*
-				id_t tmp_conv_id;
-				memcpy(&tmp_conv_id,&data_[0],sizeof(tmp_conv_id));
-				tmp_conv_id=ntohl(tmp_conv_id);*/
+					/*
+					id_t tmp_conv_id;
+					memcpy(&tmp_conv_id,&data_[0],sizeof(tmp_conv_id));
+					tmp_conv_id=ntohl(tmp_conv_id);*/
 
-				if (!conn_info.conv_manager.is_conv_used(conv))
-				{
-					int new_udp_fd;
-					ret=new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port);
+					if (!conn_info.conv_manager.is_conv_used(conv))
+					{
+						int new_udp_fd;
+						ret=new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port);
 
-					if (ret != 0) {
-						mylog(log_warn, "[%s:%d]new_connected_socket failed\n",my_ntoa(ip_port.ip),ip_port.port);
-						continue;
-					}
+						if (ret != 0) {
+							mylog(log_warn, "[%s:%d]new_connected_socket failed\n",my_ntoa(ip_port.ip),ip_port.port);
+							continue;
+						}
 
-					fd64_t fd64 = fd_manager.create(new_udp_fd);
-					ev.events = EPOLLIN;
-					ev.data.u64 = fd64;
-					ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_udp_fd, &ev);
+						fd64_t fd64 = fd_manager.create(new_udp_fd);
+						ev.events = EPOLLIN;
+						ev.data.u64 = fd64;
+						ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_udp_fd, &ev);
 
-					conn_info.conv_manager.insert_conv(conv, fd64);
-					fd_manager.get_info(fd64).ip_port=ip_port;
-					//assert(!conn_manager.exist_fd64(fd64));
+						conn_info.conv_manager.insert_conv(conv, fd64);
+						fd_manager.get_info(fd64).ip_port=ip_port;
+						//assert(!conn_manager.exist_fd64(fd64));
 
-					//conn_manager.insert_fd64(fd64,ip_port);
+						//conn_manager.insert_fd64(fd64,ip_port);
+					}
+					fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv);
+					//int fd=fd_manager.fd64_to_fd(fd64);
+					dest_t dest;
+					dest.type=type_fd64;
+					dest.inner.fd64=fd64;
+
+					//dest.conv=conv;
+					delay_send(out_delay[i],dest,new_data,new_len);
 				}
-				fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv);
-				//int fd=fd_manager.fd64_to_fd(fd64);
-				dest_t dest;
-				dest.type=type_fd64;
-				dest.inner.fd64=fd64;
-				from_fec_to_normal(conn_info,dest,new_data,new_len);
-
 				//int fd = int((u64 << 32u) >> 32u);
 				//////////////////////////////todo
 
@@ -531,6 +617,7 @@ int server_event_loop()
 			{
 				char data[buf_len];
 				int data_len;
+				u32_t conv;
 				fd64_t fd64=events[idx].data.u64;
 				if(!fd_manager.exist(fd64))   //fd64 has been closed
 				{
@@ -550,7 +637,7 @@ int server_event_loop()
 
 				assert(conn_info.conv_manager.is_u64_used(fd64));
 
-				u32_t conv=conn_info.conv_manager.find_conv_by_u64(fd64);
+				conv=conn_info.conv_manager.find_conv_by_u64(fd64);
 
 				int fd=fd_manager.to_fd(fd64);
 				data_len=recv(fd,data,max_data_len,0);
@@ -571,16 +658,22 @@ int server_event_loop()
 				}
 
 				dest_t dest;
-				dest.type=type_ip_port_conv;
-				dest.conv=conv;
+				dest.type=type_ip_port;
+				//dest.conv=conv;
 				dest.inner.ip_port=ip_port;
 
-				//char * new_data;
-				//int new_len;
-				//put_conv(conv,data,data_len,new_data,new_len);
+				char * new_data;
+				int new_len;
+				put_conv(conv,data,data_len,new_data,new_len);
+
+				int  out_n;char **out_arr;int *out_len;int *out_delay;
 
-				from_normal_to_fec(conn_info,dest,data,data_len);
-				mylog(log_trace,"[%s] send packet\n",ip_port.to_s());
+				from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay);
+				for(int i=0;i<out_n;i++)
+				{
+					delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
+				}
+				//mylog(log_trace,"[%s] send packet\n",ip_port.to_s());
 
 			}
 			else

+ 25 - 0
packet.cpp

@@ -251,3 +251,28 @@ int get_conv(u32_t &conv,const char *input,int len_in,char *&output,int &len_out
 	}
 	return 0;
 }
+
+int put_conv1(u32_t conv,const char * input,int len_in,char *&output,int &len_out)
+{
+	static char buf[buf_len];
+	output=buf;
+	u32_t n_conv=htonl(conv);
+	memcpy(output,&n_conv,sizeof(n_conv));
+	memcpy(output+sizeof(n_conv),input,len_in);
+	len_out=len_in+(int)(sizeof(n_conv));
+	return 0;
+}
+int get_conv1(u32_t &conv,const char *input,int len_in,char *&output,int &len_out )
+{
+	u32_t n_conv;
+	memcpy(&n_conv,input,sizeof(n_conv));
+	conv=ntohl(n_conv);
+	output=(char *)input+sizeof(n_conv);
+	len_out=len_in-(int)sizeof(n_conv);
+	if(len_out<0)
+	{
+		mylog(log_debug,"len_out<0\n");
+		return -1;
+	}
+	return 0;
+}