wangyu- %!s(int64=7) %!d(string=hai) anos
pai
achega
1c9c5f1d39
Modificáronse 9 ficheiros con 494 adicións e 304 borrados
  1. 1 1
      common.cpp
  2. 7 1
      connection.h
  3. 26 14
      delay_manager.cpp
  4. 16 4
      delay_manager.h
  5. 18 5
      fec_manager.cpp
  6. 30 8
      fec_manager.h
  7. 1 0
      my_ev_common.h
  8. 356 238
      tunnel_client.cpp
  9. 39 33
      tunnel_server.cpp

+ 1 - 1
common.cpp

@@ -587,7 +587,7 @@ int new_listen_socket(int &fd,u32_t ip,int port)
 	setnonblocking(fd);
     set_buf_size(fd,socket_buf_size);
 
-    mylog(log_debug,"local_listen_fd=%d\n,",fd);
+    mylog(log_debug,"local_listen_fd=%d\n",fd);
 
 	return 0;
 }

+ 7 - 1
connection.h

@@ -106,10 +106,16 @@ struct conn_info_t     //stores info for a raw connection.for client ,there is o
 	conv_manager_t conv_manager;
 	fec_encode_manager_t fec_encode_manager;
 	fec_decode_manager_t fec_decode_manager;
-	my_timer_t timer;
+	ev_timer timer;
+	//my_timer_t timer;
 	//ip_port_t ip_port;
 	u64_t last_active_time;
 	stat_t stat;
+
+	int local_listen_fd;
+	int remote_fd;
+	fd64_t remote_fd64;
+
 	conn_info_t()
 	{
 	}

+ 26 - 14
delay_manager.cpp

@@ -18,16 +18,16 @@ delay_manager_t::delay_manager_t()
 {
 	capacity=0;
 
-	if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
-	{
-		mylog(log_fatal,"timer_fd create error");
-		myexit(1);
-	}
+	//if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
+	//{
+	//	mylog(log_fatal,"timer_fd create error");
+	//	myexit(1);
+	//}
 
-	itimerspec zero_its;
-	memset(&zero_its, 0, sizeof(zero_its));
+	//itimerspec zero_its;
+	//memset(&zero_its, 0, sizeof(zero_its));
 
-	timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
+	//timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
 
 }
 delay_manager_t::~delay_manager_t()
@@ -35,10 +35,12 @@ delay_manager_t::~delay_manager_t()
 	//TODO ,we currently dont need to deconstruct it
 }
 
+/*
 int delay_manager_t::get_timer_fd()
 {
 	return timer_fd;
-}
+}*/
+
 //int add(my_time_t delay,const dest_t &dest,const char *data,int len);
 int delay_manager_t::add(my_time_t delay,const dest_t &dest,char *data,int len)
 {
@@ -78,6 +80,8 @@ int delay_manager_t::add(my_time_t delay,const dest_t &dest,char *data,int len)
 
 	delay_mp.insert(make_pair(tmp_time,tmp));
 
+	////check();  check everytime when add, is it better ??
+
 	return 0;
 }
 
@@ -112,11 +116,19 @@ int delay_manager_t::check()
 		}
 		if(!delay_mp.empty())
 		{
-			itimerspec its;
-			memset(&its.it_interval,0,sizeof(its.it_interval));
-			its.it_value.tv_sec=delay_mp.begin()->first/1000000llu;
-			its.it_value.tv_nsec=(delay_mp.begin()->first%1000000llu)*1000llu;
-			timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
+			//itimerspec its;
+			//memset(&its.it_interval,0,sizeof(its.it_interval));
+			//its.it_value.tv_sec=delay_mp.begin()->first/1000000llu;
+			//its.it_value.tv_nsec=(delay_mp.begin()->first%1000000llu)*1000llu;
+			//timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
+
+			ev_timer_stop(loop, &timer);
+			ev_timer_set(&timer, delay_mp.begin()->first /1000000.0 - ev_now(loop),0 );   //we should use ev_now here.
+			ev_timer_start(loop, &timer);
+		}
+		else
+		{
+			ev_timer_stop(loop, &timer); //not necessary
 		}
 	}
 	return 0;

+ 16 - 4
delay_manager.h

@@ -28,7 +28,7 @@ union dest_t
 	u64_t u64;
 };
 */
-
+/*
 struct my_timer_t
 {
 	int timer_fd;
@@ -101,7 +101,9 @@ struct my_timer_t
 		timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
 		return 0;
 	}
-};
+};*/
+
+
 struct delay_data_t
 {
 	dest_t dest;
@@ -113,7 +115,11 @@ struct delay_data_t
 
 struct delay_manager_t
 {
-	int timer_fd;
+	ev_timer timer;
+	struct ev_loop *loop=0;
+	void (*cb) (struct ev_loop *loop, struct ev_timer *watcher, int revents)=0;
+
+	//int timer_fd;
 	int capacity;
 	multimap<my_time_t,delay_data_t> delay_mp;  //unit us,1 us=0.001ms
 	delay_manager_t();
@@ -121,9 +127,15 @@ struct delay_manager_t
 	{
 		assert(0==1);
 	}
+	void set_loop_and_cb(struct ev_loop *loop,void (*cb) (struct ev_loop *loop, struct ev_timer *watcher, int revents))
+	{
+		this->loop=loop;
+		this->cb=cb;
+		ev_init(&timer,cb);
+	}
 	int set_capacity(int a){capacity=a;return 0;}
 	~delay_manager_t();
-	int get_timer_fd();
+	ev_timer& get_timer();
 	int check();
 	int add(my_time_t delay,const dest_t &dest,char *data,int len);
 };

+ 18 - 5
fec_manager.cpp

@@ -130,21 +130,27 @@ int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr)
 
 fec_encode_manager_t::~fec_encode_manager_t()
 {
-	fd_manager.fd64_close(timer_fd64);
+	clear();
+	//fd_manager.fd64_close(timer_fd64);
 }
+/*
 u64_t fec_encode_manager_t::get_timer_fd64()
 {
 	return timer_fd64;
-}
+}*/
+
 fec_encode_manager_t::fec_encode_manager_t()
 {
 	//int timer_fd;
+
+	/*
 	if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
 	{
 		mylog(log_fatal,"timer_fd create error");
 		myexit(1);
 	}
-	timer_fd64=fd_manager.create(timer_fd);
+	timer_fd64=fd_manager.create(timer_fd);*/
+
 
 	reset_fec_parameter(g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_timeout,g_fec_mode);
 
@@ -175,7 +181,13 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/)
 		my_time_t tmp_time=fec_timeout+first_packet_time;
 		its.it_value.tv_sec=tmp_time/1000000llu;
 		its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu;
-		timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
+		//timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
+
+		ev_timer_stop(loop, &timer);
+		ev_timer_set(&timer, tmp_time/1000000.0 - ev_now(loop) ,0 );   //we should use ev_now here.
+		ev_timer_start(loop, &timer);
+
+		//ev_timer_set(loop,)
 	}
 	if(fec_mode==0)//for type 0 use blob
 	{
@@ -402,7 +414,8 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
 
 		itimerspec its;
 		memset(&its,0,sizeof(its));
-		timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
+		ev_timer_stop(loop, &timer);
+		//timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
 
     	if(encode_fast_send&&fec_mode==1)
     	{

+ 30 - 8
fec_manager.h

@@ -112,6 +112,7 @@ struct blob_decode_t
 
 class fec_encode_manager_t
 {
+
 private:
 	u32_t seq;
 
@@ -133,30 +134,51 @@ private:
 	int output_len[max_fec_packet_num+100];
 
 	int counter;
-	int timer_fd;
-	u64_t timer_fd64;
+	//int timer_fd;
+	//u64_t timer_fd64;
 
 	int ready_for_output;
 	u32_t output_n;
 
-
 	int append(char *s,int len);
 
+	ev_timer timer;
+	struct ev_loop *loop=0;
+	void (*cb) (struct ev_loop *loop, struct ev_timer *watcher, int revents)=0;
+
 public:
 	fec_encode_manager_t();
 	~fec_encode_manager_t();
 
+	void set_data(void * data)
+	{
+		timer.data=data;
+	}
+
+
+	void set_loop_and_cb(struct ev_loop *loop,void (*cb) (struct ev_loop *loop, struct ev_timer *watcher, int revents))
+	{
+		this->loop=loop;
+		this->cb=cb;
+		ev_init(&timer,cb);
+	}
+
 	int clear()
 	{
 		counter=0;
 		blob_encode.clear();
 		ready_for_output=0;
 
-		itimerspec zero_its;
-		memset(&zero_its, 0, sizeof(zero_its));
-
-		timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
+		//itimerspec zero_its;
+		//memset(&zero_its, 0, sizeof(zero_its));
+		//timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
 
+		if(loop)
+		{
+			ev_timer_stop(loop,&timer);
+			loop=0;
+			cb=0;
+		}
 		seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug.
 
 		return 0;
@@ -176,7 +198,7 @@ public:
 	{
 		return fec_mode;
 	}
-	u64_t get_timer_fd64();
+	//u64_t get_timer_fd64();
 	int reset_fec_parameter(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type);
 	int input(char *s,int len/*,int &is_first_packet*/);
 	int output(int &n,char ** &s_arr,int *&len);

+ 1 - 0
my_ev_common.h

@@ -12,4 +12,5 @@
 
 #define EV_STANDALONE 1
 #define EV_COMMON  void *data; unsigned long long u64;
+#define EV_COMPAT3 0
 //#define EV_VERIFY 2

+ 356 - 238
tunnel_client.cpp

@@ -1,95 +1,387 @@
 #include "tunnel.h"
 
+void data_from_local_or_fec_timeout(conn_info_t & conn_info,int is_time_out)
+{
+	fd64_t &remote_fd64=conn_info.remote_fd64;
+	int & local_listen_fd=conn_info.local_listen_fd;
+
+	char data[buf_len];
+	int data_len;
+	ip_port_t ip_port;
+	u32_t conv;
+	int  out_n;char **out_arr;int *out_len;my_time_t *out_delay;
+	dest_t dest;
+	dest.type=type_fd64;
+	dest.inner.fd64=remote_fd64;
+	dest.cook=1;
+
+	if(is_time_out)
+	{
+		//fd64_t fd64=events[idx].data.u64;
+		mylog(log_trace,"events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()\n");
+
+		//uint64_t value;
+		//if(!fd_manager.exist(fd64))   //fd64 has been closed
+		//{
+		//	mylog(log_trace,"!fd_manager.exist(fd64)");
+		//	continue;
+		//}
+		//if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8)
+		//{
+		//	mylog(log_trace,"(ret=read(fd_manager.to_fd(fd64), &value, 8))!=8,ret=%d\n",ret);
+		//	continue;
+		//}
+		//if(value==0)
+		//{
+		//	mylog(log_debug,"value==0\n");
+		//	continue;
+		//}
+		//assert(value==1);
+		from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
+	}
+	else//events[idx].data.u64 == (u64_t)local_listen_fd
+	{
+		mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n");
+		struct sockaddr_in udp_new_addr_in={0};
+		socklen_t udp_new_addr_len = sizeof(sockaddr_in);
+		if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0,
+				(struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) {
+			mylog(log_error,"recv_from error,this shouldnt happen,err=%s,but we can try to continue\n",strerror(errno));
+			return;
+		};
+
+		if(!disable_mtu_warn&&data_len>=mtu_warn)
+		{
+			mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
+		}
+		mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
+				ntohs(udp_new_addr_in.sin_port),data_len);
+
+		ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
+		ip_port.port=ntohs(udp_new_addr_in.sin_port);
+
+		u64_t u64=ip_port.to_u64();
+
+		if(!conn_info.conv_manager.is_u64_used(u64))
+		{
+			if(conn_info.conv_manager.get_size() >=max_conv_num)
+			{
+				mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n");
+				return;
+			}
+			conv=conn_info.conv_manager.get_new_conv();
+			conn_info.conv_manager.insert_conv(conv,u64);
+			mylog(log_info,"new packet from %s:%d,conv_id=%x\n",inet_ntoa(udp_new_addr_in.sin_addr),ntohs(udp_new_addr_in.sin_port),conv);
+		}
+		else
+		{
+			conv=conn_info.conv_manager.find_conv_by_u64(u64);
+			mylog(log_trace,"conv=%d\n",conv);
+		}
+		conn_info.conv_manager.update_active_time(conv);
+		char * new_data;
+		int new_len;
+		put_conv(conv,data,data_len,new_data,new_len);
+
+
+		mylog(log_trace,"data_len=%d new_len=%d\n",data_len,new_len);
+		from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay);
+
+	}
+	mylog(log_trace,"out_n=%d\n",out_n);
+	for(int i=0;i<out_n;i++)
+	{
+		delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
+	}
+}
+static void local_listen_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
+{
+	assert(!(revents&EV_ERROR));
+
+	conn_info_t & conn_info= *((conn_info_t*)watcher->data);
+
+	data_from_local_or_fec_timeout(conn_info,0);
+}
+
+static void remote_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
+{
+	assert(!(revents&EV_ERROR));
+
+	conn_info_t & conn_info= *((conn_info_t*)watcher->data);
+
+	char data[buf_len];
+	if(!fd_manager.exist(watcher->u64))   //fd64 has been closed
+	{
+		mylog(log_trace,"!fd_manager.exist(events[idx].data.u64)");
+		return;
+	}
+	fd64_t &remote_fd64=conn_info.remote_fd64;
+	int &remote_fd=conn_info.remote_fd;
+
+	assert(watcher->u64==remote_fd64);
+
+	int fd=fd_manager.to_fd(remote_fd64);
+
+	int data_len =recv(fd,data,max_data_len,0);
+	mylog(log_trace, "received data from udp fd %d, len=%d\n", remote_fd,data_len);
+	if(data_len<0)
+	{
+		if(errno==ECONNREFUSED)
+		{
+			mylog(log_debug, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
+		}
+
+		mylog(log_warn, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
+		return;
+	}
+	if(!disable_mtu_warn&&data_len>mtu_warn)
+	{
+		mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
+	}
+
+	if(de_cook(data,data_len)!=0)
+	{
+		mylog(log_debug,"de_cook error");
+		return;
+	}
+
+	int  out_n;char **out_arr;int *out_len;my_time_t *out_delay;
+	from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay);
+
+	mylog(log_trace,"out_n=%d\n",out_n);
+
+	for(int i=0;i<out_n;i++)
+	{
+		u32_t conv;
+		char *new_data;
+		int new_len;
+		if(get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0)
+		{
+			mylog(log_debug,"get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0");
+			continue;
+		}
+		if(!conn_info.conv_manager.is_conv_used(conv))
+		{
+			mylog(log_trace,"!conn_info.conv_manager.is_conv_used(conv)");
+			continue;
+		}
+
+		conn_info.conv_manager.update_active_time(conv);
+
+		u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv);
+		dest_t dest;
+		dest.inner.fd_ip_port.fd=conn_info.local_listen_fd;
+		dest.inner.fd_ip_port.ip_port.from_u64(u64);
+		dest.type=type_fd_ip_port;
+
+		delay_send(out_delay[i],dest,new_data,new_len);
+	}
+}
+
+static void fifo_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
+{
+	assert(!(revents&EV_ERROR));
+	int fifo_fd=watcher->fd;
+
+	char buf[buf_len];
+	int len=read (fifo_fd, buf, sizeof (buf));
+	if(len<0)
+	{
+		mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno));
+		return;
+	}
+	buf[len]=0;
+	handle_command(buf);
+}
+
+static void delay_manager_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents)
+{
+	assert(!(revents&EV_ERROR));
+
+	//do nothing
+}
+
+static void fec_encode_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents)
+{
+	assert(!(revents&EV_ERROR));
+
+	conn_info_t & conn_info= *((conn_info_t*)watcher->data);
+
+	data_from_local_or_fec_timeout(conn_info,1);
+
+}
+
+static void conn_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents)
+{
+	assert(!(revents&EV_ERROR));
+
+	uint64_t value;
+
+	conn_info_t & conn_info= *((conn_info_t*)watcher->data);
+
+	//read(conn_info.timer.get_timer_fd(), &value, 8);
+	conn_info.conv_manager.clear_inactive();
+	mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n");
+
+	conn_info.stat.report_as_client();
+
+	if(debug_force_flush_fec)
+	{
+		int  out_n;char **out_arr;int *out_len;my_time_t *out_delay;
+		dest_t dest;
+		dest.type=type_fd64;
+		dest.inner.fd64=conn_info.remote_fd64;
+		dest.cook=1;
+		from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
+		for(int i=0;i<out_n;i++)
+		{
+			delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
+		}
+	}
+}
+
+static void prepare_cb(struct ev_loop *loop, struct ev_prepare *watcher, int revents)
+{
+	assert(!(revents&EV_ERROR));
+
+	delay_manager.check();
+}
+
 int tunnel_client_event_loop()
 {
 	int i, j, k;int ret;
 	int yes = 1;
-	int epoll_fd;
-	int remote_fd;
-	fd64_t remote_fd64;
+	//int epoll_fd;
+
 
     conn_info_t *conn_info_p=new conn_info_t;
     conn_info_t &conn_info=*conn_info_p;  //huge size of conn_info,do not allocate on stack
 
-	int local_listen_fd;
+	int &local_listen_fd=conn_info.local_listen_fd;
     new_listen_socket(local_listen_fd,local_ip_uint32,local_port);
 
-	epoll_fd = epoll_create1(0);
-	assert(epoll_fd>0);
+	//epoll_fd = epoll_create1(0);
+	//assert(epoll_fd>0);
 
-	const int max_events = 4096;
-	struct epoll_event ev, events[max_events];
-	if (epoll_fd < 0) {
-		mylog(log_fatal,"epoll return %d\n", epoll_fd);
-		myexit(-1);
-	}
+	//const int max_events = 4096;
+	//struct epoll_event ev, events[max_events];
+	//if (epoll_fd < 0) {
+	//	mylog(log_fatal,"epoll return %d\n", epoll_fd);
+	//	myexit(-1);
+	//}
 
-	ev.events = EPOLLIN;
-	ev.data.u64 = local_listen_fd;
-	ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev);
-	if (ret!=0) {
-		mylog(log_fatal,"add  udp_listen_fd error\n");
-		myexit(-1);
-	}
+	struct ev_loop * loop= ev_default_loop(0);
+	assert(loop != NULL);
+
+	//ev.events = EPOLLIN;
+	//ev.data.u64 = local_listen_fd;
+	//ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev);
+	//if (ret!=0) {
+	//	mylog(log_fatal,"add  udp_listen_fd error\n");
+	//	myexit(-1);
+	//}
+	struct ev_io local_listen_watcher;
+	local_listen_watcher.data=&conn_info;
+
+    ev_io_init(&local_listen_watcher, local_listen_cb, local_listen_fd, EV_READ);
+    ev_io_start(loop, &local_listen_watcher);
+
+    int & remote_fd=conn_info.remote_fd;
+    fd64_t &remote_fd64=conn_info.remote_fd64;
 
 	assert(new_connected_socket(remote_fd,remote_ip_uint32,remote_port)==0);
 	remote_fd64=fd_manager.create(remote_fd);
 
 	mylog(log_debug,"remote_fd64=%llu\n",remote_fd64);
 
-	ev.events = EPOLLIN;
-	ev.data.u64 = remote_fd64;
+	//ev.events = EPOLLIN;
+	//ev.data.u64 = remote_fd64;
 
-	ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, remote_fd, &ev);
-	if (ret!= 0) {
-		mylog(log_fatal,"add raw_fd error\n");
-		myexit(-1);
-	}
+	//ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, remote_fd, &ev);
+	//if (ret!= 0) {
+	//	mylog(log_fatal,"add raw_fd error\n");
+	//	myexit(-1);
+	//}
 
-	ev.events = EPOLLIN;
-	ev.data.u64 = delay_manager.get_timer_fd();
+	struct ev_io remote_watcher;
+	remote_watcher.data=&conn_info;
+	remote_watcher.u64=remote_fd64;
 
-	mylog(log_debug,"delay_manager.get_timer_fd()=%d\n",delay_manager.get_timer_fd());
-	ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
-	if (ret!= 0) {
-		mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
-		myexit(-1);
-	}
+    ev_io_init(&remote_watcher, remote_cb, remote_fd, EV_READ);
+    ev_io_start(loop, &remote_watcher);
 
-	u64_t tmp_fd64=conn_info.fec_encode_manager.get_timer_fd64();
-	ev.events = EPOLLIN;
-	ev.data.u64 = tmp_fd64;
 
-	mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64());
-	ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(tmp_fd64), &ev);
-	if (ret!= 0) {
-		mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n");
-		myexit(-1);
-	}
+	//ev.events = EPOLLIN;
+	//ev.data.u64 = delay_manager.get_timer_fd();
+
+	//mylog(log_debug,"delay_manager.get_timer_fd()=%d\n",delay_manager.get_timer_fd());
+	//ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
+	//if (ret!= 0) {
+	//	mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
+	//	myexit(-1);
+	//}
+
+    delay_manager.set_loop_and_cb(loop,delay_manager_cb);
+
+    conn_info.fec_encode_manager.set_data(&conn_info);
+    conn_info.fec_encode_manager.set_loop_and_cb(loop,fec_encode_cb);
+
+	//u64_t tmp_fd64=conn_info.fec_encode_manager.get_timer_fd64();
+	//ev.events = EPOLLIN;
+	//ev.data.u64 = tmp_fd64;
+
+	//mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64());
+	//ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(tmp_fd64), &ev);
+	//if (ret!= 0) {
+	//	mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n");
+	//	myexit(-1);
+	//}
+
+    conn_info.timer.data=&conn_info;
+    ev_init(&conn_info.timer,conn_timer_cb);
+    ev_timer_set(&conn_info.timer, 0, timer_interval/1000.0 );
+    ev_timer_start(loop,&conn_info.timer);
+	//conn_info.timer.add_fd_to_epoll(epoll_fd);
+	//conn_info.timer.set_timer_repeat_us(timer_interval*1000);
 
-	conn_info.timer.add_fd_to_epoll(epoll_fd);
-	conn_info.timer.set_timer_repeat_us(timer_interval*1000);
+	//mylog(log_debug,"conn_info.timer.get_timer_fd()=%d\n",conn_info.timer.get_timer_fd());
 
-	mylog(log_debug,"conn_info.timer.get_timer_fd()=%d\n",conn_info.timer.get_timer_fd());
 
 
 
+    struct ev_io fifo_watcher;
+
 	int fifo_fd=-1;
 
 	if(fifo_file[0]!=0)
 	{
 		fifo_fd=create_fifo(fifo_file);
-		ev.events = EPOLLIN;
-		ev.data.u64 = fifo_fd;
+		//ev.events = EPOLLIN;
+		//ev.data.u64 = fifo_fd;
+
+		//ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fifo_fd, &ev);
+		//if (ret!= 0) {
+		//	mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno));
+		//	myexit(-1);
+		//}
+		//mylog(log_info,"fifo_file=%s\n",fifo_file);
+
+	    ev_io_init(&fifo_watcher, fifo_cb, fifo_fd, EV_READ);
+	    ev_io_start(loop, &fifo_watcher);
 
-		ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fifo_fd, &ev);
-		if (ret!= 0) {
-			mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno));
-			myexit(-1);
-		}
-		mylog(log_info,"fifo_file=%s\n",fifo_file);
 	}
 
+	ev_prepare prepare_watcher;
+	ev_init(&prepare_watcher,prepare_cb);
+	ev_prepare_start(loop,&prepare_watcher);
+
+
+	ev_run(loop, 0);
+
+	mylog(log_warn,"ev_run returned\n");
+	myexit(0);
+
+	/*
 	while(1)////////////////////////
 	{
 		if(about_to_exit) myexit(0);
@@ -110,199 +402,25 @@ int tunnel_client_event_loop()
 		for (idx = 0; idx < nfds; ++idx) {
 			if(events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd())
 			{
-				uint64_t value;
-				read(conn_info.timer.get_timer_fd(), &value, 8);
-				conn_info.conv_manager.clear_inactive();
-				mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n");
-
-				conn_info.stat.report_as_client();
-
-				if(debug_force_flush_fec)
-				{
-				int  out_n;char **out_arr;int *out_len;my_time_t *out_delay;
-				dest_t dest;
-				dest.type=type_fd64;
-				dest.inner.fd64=remote_fd64;
-				dest.cook=1;
-				from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
-				for(int i=0;i<out_n;i++)
-				{
-					delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
-				}
-				}
+
 			}
+
 			else if (events[idx].data.u64 == (u64_t)fifo_fd)
 			{
-				char buf[buf_len];
-				int len=read (fifo_fd, buf, sizeof (buf));
-				if(len<0)
-				{
-					mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno));
-					continue;
-				}
-				buf[len]=0;
-				handle_command(buf);
+
 			}
 			else if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64())
 			{
-				char data[buf_len];
-				int data_len;
-				ip_port_t ip_port;
-				u32_t conv;
-				int  out_n;char **out_arr;int *out_len;my_time_t *out_delay;
-				dest_t dest;
-				dest.type=type_fd64;
-				dest.inner.fd64=remote_fd64;
-				dest.cook=1;
-
-				if(events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64())
-				{
-					fd64_t fd64=events[idx].data.u64;
-					mylog(log_trace,"events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()\n");
-
-					uint64_t value;
-					if(!fd_manager.exist(fd64))   //fd64 has been closed
-					{
-						mylog(log_trace,"!fd_manager.exist(fd64)");
-						continue;
-					}
-					if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8)
-					{
-						mylog(log_trace,"(ret=read(fd_manager.to_fd(fd64), &value, 8))!=8,ret=%d\n",ret);
-						continue;
-					}
-					if(value==0)
-					{
-						mylog(log_debug,"value==0\n");
-						continue;
-					}
-					assert(value==1);
-					from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
-				}
-				else//events[idx].data.u64 == (u64_t)local_listen_fd
-				{
-					mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n");
-					struct sockaddr_in udp_new_addr_in={0};
-					socklen_t udp_new_addr_len = sizeof(sockaddr_in);
-					if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0,
-							(struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) {
-						mylog(log_error,"recv_from error,this shouldnt happen,err=%s,but we can try to continue\n",strerror(errno));
-						continue;
-					};
-
-					if(!disable_mtu_warn&&data_len>=mtu_warn)
-					{
-						mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
-					}
-					mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
-							ntohs(udp_new_addr_in.sin_port),data_len);
-
-					ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
-					ip_port.port=ntohs(udp_new_addr_in.sin_port);
-
-					u64_t u64=ip_port.to_u64();
-
-					if(!conn_info.conv_manager.is_u64_used(u64))
-					{
-						if(conn_info.conv_manager.get_size() >=max_conv_num)
-						{
-							mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n");
-							continue;
-						}
-						conv=conn_info.conv_manager.get_new_conv();
-						conn_info.conv_manager.insert_conv(conv,u64);
-						mylog(log_info,"new packet from %s:%d,conv_id=%x\n",inet_ntoa(udp_new_addr_in.sin_addr),ntohs(udp_new_addr_in.sin_port),conv);
-					}
-					else
-					{
-						conv=conn_info.conv_manager.find_conv_by_u64(u64);
-						mylog(log_trace,"conv=%d\n",conv);
-					}
-					conn_info.conv_manager.update_active_time(conv);
-					char * new_data;
-					int new_len;
-					put_conv(conv,data,data_len,new_data,new_len);
-
-
-					mylog(log_trace,"data_len=%d new_len=%d\n",data_len,new_len);
-					from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay);
-
-				}
-				mylog(log_trace,"out_n=%d\n",out_n);
-				for(int i=0;i<out_n;i++)
-				{
-					delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
-				}
+
 			}
 		    else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) {
-				uint64_t value;
-				read(delay_manager.get_timer_fd(), &value, 8);
-				mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n");
+				//uint64_t value;
+				//read(delay_manager.get_timer_fd(), &value, 8);
+				//mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n");
 			}
 			else if(events[idx].data.u64>u32_t(-1) )
 			{
-				char data[buf_len];
-				if(!fd_manager.exist(events[idx].data.u64))   //fd64 has been closed
-				{
-					mylog(log_trace,"!fd_manager.exist(events[idx].data.u64)");
-					continue;
-				}
-				assert(events[idx].data.u64==remote_fd64);
-				int fd=fd_manager.to_fd(remote_fd64);
-				int data_len =recv(fd,data,max_data_len,0);
-				mylog(log_trace, "received data from udp fd %d, len=%d\n", remote_fd,data_len);
-				if(data_len<0)
-				{
-					if(errno==ECONNREFUSED)
-					{
-						mylog(log_debug, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
-					}
-
-					mylog(log_warn, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
-					continue;
-				}
-				if(!disable_mtu_warn&&data_len>mtu_warn)
-				{
-					mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
-				}
-
-				if(de_cook(data,data_len)!=0)
-				{
-					mylog(log_debug,"de_cook error");
-					continue;
-				}
-
-				int  out_n;char **out_arr;int *out_len;my_time_t *out_delay;
-				from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay);
-
-				mylog(log_trace,"out_n=%d\n",out_n);
-
-				for(int i=0;i<out_n;i++)
-				{
-					u32_t conv;
-					char *new_data;
-					int new_len;
-					if(get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0)
-					{
-						mylog(log_debug,"get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0");
-						continue;
-					}
-					if(!conn_info.conv_manager.is_conv_used(conv))
-					{
-						mylog(log_trace,"!conn_info.conv_manager.is_conv_used(conv)");
-						continue;
-					}
-
-					conn_info.conv_manager.update_active_time(conv);
-
-					u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv);
-					dest_t dest;
-					dest.inner.fd_ip_port.fd=local_listen_fd;
-					dest.inner.fd_ip_port.ip_port.from_u64(u64);
-					dest.type=type_fd_ip_port;
-
-					delay_send(out_delay[i],dest,new_data,new_len);
-				}
+
 			}
 			else
 			{
@@ -310,7 +428,7 @@ int tunnel_client_event_loop()
 				myexit(-1);
 			}
 		}
-		delay_manager.check();
-	}
+		//delay_manager.check();
+	}*/
 	return 0;
 }

+ 39 - 33
tunnel_server.cpp

@@ -6,22 +6,56 @@
  */
 
 #include "tunnel.h"
+void data_from_local_or_fec_timeout(conn_info_t & conn_info,int from_local)
+{
+
+}
+
+static void local_listen_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
+{
+
+}
+
+static void remote_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
+{
 
+}
+
+static void fifo_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
+{
+
+}
+
+static void delay_manager_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents)
+{
+
+}
+
+static void fec_encode_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents)
+{
+
+}
+
+static void conn_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents)
+{
+
+}
+
+static void prepare_cb(struct ev_loop *loop, struct ev_prepare *watcher, int revents)
+{
+
+}
 
 int tunnel_server_event_loop()
 {
-#if 0
-	//char buf[buf_len];
+
 	int i, j, k;int ret;
 	int yes = 1;
 	int epoll_fd;
 	int remote_fd;
 
-//    conn_info_t conn_info;
 	int local_listen_fd;
-//	fd64_t local_listen_fd64;
     new_listen_socket(local_listen_fd,local_ip_uint32,local_port);
-   // local_listen_fd64=fd_manager.create(local_listen_fd);
 
 	epoll_fd = epoll_create1(0);
 	assert(epoll_fd>0);
@@ -85,7 +119,6 @@ int tunnel_server_event_loop()
 			if(errno==EINTR  )
 			{
 				mylog(log_info,"epoll interrupted by signal,continue\n");
-				//myexit(0);
 			}
 			else
 			{
@@ -96,22 +129,12 @@ int tunnel_server_event_loop()
 		int idx;
 		for (idx = 0; idx < nfds; ++idx)
 		{
-			/*
-			if ((events[idx].data.u64 ) == (u64_t)timer_fd)
-			{
-				conn_manager.clear_inactive();
-				u64_t dummy;
-				read(timer_fd, &dummy, 8);
-				//current_time_rough=get_current_time();
-			}
-			else */
 			if(events[idx].data.u64==(u64_t)timer.get_timer_fd())
 			{
 				uint64_t value;
 				read(timer.get_timer_fd(), &value, 8);
 				conn_manager.clear_inactive();
 				mylog(log_trace,"events[idx].data.u64==(u64_t)timer.get_timer_fd()\n");
-				//conn_info.conv_manager.clear_inactive();
 			}
 
 			else if (events[idx].data.u64 == (u64_t)fifo_fd)
@@ -131,7 +154,6 @@ int tunnel_server_event_loop()
 			{
 
 				mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n");
-				//int recv_len;
 				char data[buf_len];
 				int data_len;
 				struct sockaddr_in udp_new_addr_in={0};
@@ -140,7 +162,6 @@ int tunnel_server_event_loop()
 						(struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) {
 					mylog(log_error,"recv_from error,this shouldnt happen,err=%s,but we can try to continue\n",strerror(errno));
 					continue;
-					//myexit(1);
 				};
 				mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
 						ntohs(udp_new_addr_in.sin_port),data_len);
@@ -172,8 +193,6 @@ int tunnel_server_event_loop()
 
 					conn_manager.insert(ip_port);
 					conn_info_t &conn_info=conn_manager.find(ip_port);
-					//conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type);
-					//conn_info.conv_manager.reserve();  //already reserved in constructor
 
 					u64_t fec_fd64=conn_info.fec_encode_manager.get_timer_fd64();
 					mylog(log_debug,"fec_fd64=%llu\n",fec_fd64);
@@ -212,10 +231,6 @@ int tunnel_server_event_loop()
 						continue;
 					}
 
-					/*
-					id_t tmp_conv_id;
-					memcpy(&tmp_conv_id,&data_[0],sizeof(tmp_conv_id));
-					tmp_conv_id=ntohl(tmp_conv_id);*/
 
 					if (!conn_info.conv_manager.is_conv_used(conv))
 					{
@@ -243,17 +258,12 @@ int tunnel_server_event_loop()
 
 
 						mylog(log_info,"[%s]new conv %x,fd %d created,fd64=%llu\n",ip_port.to_s(),conv,new_udp_fd,fd64);
-						//assert(!conn_manager.exist_fd64(fd64));
-
-						//conn_manager.insert_fd64(fd64,ip_port);
 					}
 					conn_info.conv_manager.update_active_time(conv);
 					fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv);
-					//int fd=fd_manager.fd64_to_fd(fd64);
 					dest_t dest;
 					dest.type=type_fd64;
 					dest.inner.fd64=fd64;
-					//dest.conv=conv;
 					delay_send(out_delay[i],dest,new_data,new_len);
 				}
 			}
@@ -280,7 +290,6 @@ int tunnel_server_event_loop()
 				assert(conn_manager.exist(ip_port));
 
 				conn_info_t &conn_info=conn_manager.find(ip_port);
-				//conn_info.update_active_time(); //cant put it here
 
 				int  out_n=-2;char **out_arr;int *out_len;my_time_t *out_delay;
 
@@ -292,7 +301,6 @@ int tunnel_server_event_loop()
 
 				if(fd64==conn_info.fec_encode_manager.get_timer_fd64())
 				{
-					//mylog(log_infol,"timer!!!\n");
 					uint64_t value;
 					if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8)
 					{
@@ -357,7 +365,6 @@ int tunnel_server_event_loop()
 				{
 					delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
 				}
-				//mylog(log_trace,"[%s] send packet\n",ip_port.to_s());
 
 			}
 			else
@@ -368,7 +375,6 @@ int tunnel_server_event_loop()
 		}
 		delay_manager.check();
 	}
-#endif
 	return 0;
 
 }