Browse Source

doesnt work any more..

wangyu- 8 years ago
parent
commit
4680b1a3a1
3 changed files with 178 additions and 30 deletions
  1. 166 28
      fec_manager.cpp
  2. 10 1
      fec_manager.h
  3. 2 1
      main.cpp

+ 166 - 28
fec_manager.cpp

@@ -257,12 +257,21 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
     	for(int i=0;i<actual_data_num+actual_redundant_num;i++)
     	{
     		int tmp_idx=0;
-    		write_u32(buf[i]+tmp_idx,seq);
-    		tmp_idx+=sizeof(u32_t);
-    		buf[i][tmp_idx++]=(unsigned char)type;
-    		buf[i][tmp_idx++]=(unsigned char)actual_data_num;
-    		buf[i][tmp_idx++]=(unsigned char)actual_redundant_num;
-    		buf[i][tmp_idx++]=(unsigned char)i;
+
+        	write_u32(buf[i] + tmp_idx, seq);
+			tmp_idx += sizeof(u32_t);
+			buf[i][tmp_idx++] = (unsigned char) type;
+			if (type == 1 && i < actual_data_num)
+			{
+				buf[i][tmp_idx++] = (unsigned char) 0;
+				buf[i][tmp_idx++] = (unsigned char) 0;
+			} else
+			{
+				buf[i][tmp_idx++] = (unsigned char) actual_data_num;
+				buf[i][tmp_idx++] = (unsigned char) actual_redundant_num;
+			}
+			buf[i][tmp_idx++] = (unsigned char) i;
+
 
     		tmp_output_buf[i]=buf[i]+tmp_idx; //////caution ,trick here.
 
@@ -292,17 +301,65 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
 		rs_encode2(actual_data_num,actual_data_num+actual_redundant_num,tmp_output_buf,fec_len);
 
 		mylog(log_info,"!!! s= %d\n");
-    	ready_for_output=1;
-    	seq++;
-    	counter=0;
+
     	output_n=actual_data_num+actual_redundant_num;
     	blob_encode.clear();
 
-
 		itimerspec its;
 		memset(&its,0,sizeof(its));
 		timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
+
+    	if(encode_fast_send&&s!=0&&type==1)
+    	{
+			int packet_to_send[max_fec_packet_num+5]={0};
+			int packet_to_send_counter=0;
+
+			assert(counter!=0);
+			if(s!=0)
+				packet_to_send[packet_to_send_counter++]=counter-1;
+			for(int i=actual_data_num;i<actual_data_num+actual_redundant_num;i++)
+			{
+				{
+					packet_to_send[packet_to_send_counter++]=i;
+				}
+			}
+			output_n=packet_to_send_counter;//re write
+			for(int i=0;i<packet_to_send_counter;i++)
+			{
+				output_buf[i]=output_buf[packet_to_send[i]];
+				output_len[i]=output_len[packet_to_send[i]];
+			}
+
+    	}
+
+    	ready_for_output=1;
+    	seq++;
+    	counter=0;
+
 	}
+    else
+    {
+    	if(encode_fast_send&&s!=0&&type==1)
+    	{
+    		assert(counter>=1);
+    		assert(counter<=255);
+    		int buf_idx=counter-1;
+    		assert(ready_for_output==0);
+    		ready_for_output=1;
+    		output_n=1;
+
+    		int tmp_idx=0;
+    		write_u32(buf[buf_idx]+tmp_idx,seq);
+    		tmp_idx+=sizeof(u32_t);
+    		buf[buf_idx][tmp_idx++]=(unsigned char)type;
+    		buf[buf_idx][tmp_idx++]=(unsigned char)0;
+    		buf[buf_idx][tmp_idx++]=(unsigned char)0;
+    		buf[buf_idx][tmp_idx++]=(unsigned char)((u32_t)counter);
+
+    		output_len[0]=buf_s_len[buf_idx]+tmp_idx;
+    		output_buf[0]=buf[buf_idx];
+    	}
+    }
 
 	if(s!=0&&delayed_append)
 	{
@@ -356,6 +413,7 @@ int fec_decode_manager_t::input(char *s,int len)
 	int inner_index=(unsigned char)s[tmp_idx++];
 	len=len-tmp_idx;
 
+	mylog(log_info,"input\n");
 	if(len<0)
 	{
 		mylog(log_warn,"len<0\n");
@@ -377,30 +435,52 @@ int fec_decode_manager_t::input(char *s,int len)
 
 	if(data_num+redundant_num>max_fec_packet_num)
 	{
+		mylog(log_info,"failed here\n");
 		return -1;
 	}
 	if(!anti_replay.is_vaild(seq))
 	{
+		mylog(log_info,"failed here2\n");
 		return 0;
 	}
-	if(!mp[seq].empty())
+
+	int ok=1;
+	if(mp[seq].type==-1)
+		mp[seq].type=type;
+	else
+	{
+		if(mp[seq].type!=type) ok=0;
+	}
+
+	if(type==0) assert(data_num!=0);
+
+	if(data_num!=0)
 	{
-		int first_idx=mp[seq].begin()->second;
-		int ok=1;
-		if(fec_data[first_idx].data_num!=data_num)
-			ok=0;
-		if(fec_data[first_idx].redundant_num!=redundant_num)
-			ok=0;
-		if(fec_data[first_idx].type!=type)
-			ok=0;
-		if(type==0&&fec_data[first_idx].len!=len)
-			ok=0;
-		if(ok==0)
+		mp[seq].data_counter++;
+
+		if(mp[seq].data_num==-1)
 		{
-			return -1;
+			mp[seq].data_num=data_num;
+			mp[seq].redundant_num=redundant_num;
+			mp[seq].len=len;
+		}
+		else
+		{
+			if(mp[seq].data_num!=data_num||mp[seq].redundant_num!=redundant_num||mp[seq].len!=len)
+				ok=0;
 		}
 	}
 
+	if(ok==0)
+	{
+		mylog(log_info,"ok=0\n");
+		return -1;
+	}
+	else
+	{
+		mylog(log_info,"ok=1\n");
+	}
+
 	if(fec_data[index].used!=0)
 	{
 		u32_t tmp_seq=fec_data[index].seq;
@@ -423,12 +503,70 @@ int fec_decode_manager_t::input(char *s,int len)
 	fec_data[index].idx=inner_index;
 	fec_data[index].len=len;
 	memcpy(fec_data[index].buf,s+tmp_idx,len);
-	mp[seq][inner_index]=index;
+	mp[seq].group_mp[inner_index]=index;
+
+	map<int,int> &inner_mp=mp[seq].group_mp;
+
+
+	int about_to_fec=0;
+	if(type==0)
+	{
+		assert((int)inner_mp.size()<=data_num);
+		if((int)inner_mp.size()==data_num)
+			about_to_fec=1;
+	}
+	else
+	{
+		if(mp[seq].data_num!=-1)
+		{
+			data_num=mp[seq].data_num;
+			redundant_num=mp[seq].redundant_num;
+
+			if(mp[seq].data_counter>data_num)
+			{
+				mylog(log_warn,"unexpected mp[seq].data_counter>mp[seq].data_num\n");
+				anti_replay.set_invaild(seq);
+				return -1;
+			}
+			else if(mp[seq].data_counter==data_num)
+			{
+				anti_replay.set_invaild(seq);// dont do fec,but  still set invaild
+			}
+			else //mp[seq].data_counter < mp[seq].data_num
+			{
+				if((int)inner_mp.size()>data_num+1)
+				{
+					mylog(log_warn,"unexpected (int)inner_mp.size()>data_num+1\n");
+					anti_replay.set_invaild(seq);
+					return -1;
+				}
+
+				if((int)inner_mp.size()==data_num+1)
+				{
+					anti_replay.set_invaild(seq);
+					if(data_num==0)
+						return 0;
+					else
+					{
+						//mylog(log_warn,"data_num!=0\n"); //this is possible
+						return -1;
+					}
+				}
+
+				if((int)inner_mp.size()==data_num)
+					about_to_fec=1;
+			}
+		}
+		else
+		{
+
+		}
+		//for()
+	}
 
-	map<int,int> &inner_mp=mp[seq];
-	assert((int)inner_mp.size()<=data_num);
-	if((int)inner_mp.size()==data_num)
+	if(about_to_fec)
 	{
+		mylog(log_error,"fec here!\n");
 		if(type==0)
 		{
 			char *fec_tmp_arr[max_fec_packet_num+5]={0};
@@ -533,7 +671,7 @@ int fec_decode_manager_t::input(char *s,int len)
 
 		if(decode_fast_send)
 		{
-			if(type==1&&inner_index<data_num)
+			if(type==1&&data_num==0)
 			{
 				assert(ready_for_output==0);
 				output_n=1;

+ 10 - 1
fec_manager.h

@@ -135,12 +135,21 @@ struct fec_data_t
 	char buf[buf_len];
 	int len;
 };
+struct fec_group_t
+{
+	int type=-1;
+	int data_num=-1;
+	int redundant_num=-1;
+	int len=-1;
+	int data_counter=0;
+	map<int,int>  group_mp;
+};
 class fec_decode_manager_t
 {
 	anti_replay_t anti_replay;
 	fec_data_t fec_data[fec_buff_size];
 	int index;
-	unordered_map<u32_t, map<int,int> > mp;
+	unordered_map<u32_t, fec_group_t> mp;
 	blob_decode_t blob_decode;
 
 

+ 2 - 1
main.cpp

@@ -553,7 +553,7 @@ int server_event_loop()
 			{
 				uint64_t value;
 				read(timer.get_timer_fd(), &value, 8);
-				conn_manager.clear_inactive();
+				///conn_manager.clear_inactive();///////////////////////////remember to uncomment
 				//conn_info.conv_manager.clear_inactive();
 			}
 			else if (events[idx].data.u64 == (u64_t)local_listen_fd)
@@ -614,6 +614,7 @@ int server_event_loop()
 				int  out_n;char **out_arr;int *out_len;int *out_delay;
 				from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay);
 
+				mylog(log_info,"out_n= %d\n",out_n);
 				for(int i=0;i<out_n;i++)
 				{
 					u32_t conv;