|
@@ -11,14 +11,16 @@
|
|
|
#include "lib/rs.h"
|
|
|
#include "fd_manager.h"
|
|
|
|
|
|
-int g_fec_data_num=20;
|
|
|
-int g_fec_redundant_num=10;
|
|
|
-int g_fec_mtu=1250;
|
|
|
-int g_fec_queue_len=200;
|
|
|
-int g_fec_timeout=8*1000; //8ms
|
|
|
-int g_fec_mode=0;
|
|
|
+//int g_fec_data_num=20;
|
|
|
+//int g_fec_redundant_num=10;
|
|
|
+//int g_fec_mtu=1250;
|
|
|
+//int g_fec_queue_len=200;
|
|
|
+//int g_fec_timeout=8*1000; //8ms
|
|
|
+//int g_fec_mode=0;
|
|
|
|
|
|
-int dynamic_update_fec=1;
|
|
|
+fec_parameter_t g_fec_par;
|
|
|
+
|
|
|
+//int dynamic_update_fec=1;
|
|
|
|
|
|
const int encode_fast_send=1;
|
|
|
const int decode_fast_send=1;
|
|
@@ -153,10 +155,13 @@ fec_encode_manager_t::fec_encode_manager_t()
|
|
|
timer_fd64=fd_manager.create(timer_fd);*/
|
|
|
|
|
|
|
|
|
- reset_fec_parameter(g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_timeout,g_fec_mode);
|
|
|
+ /////reset_fec_parameter(g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_timeout,g_fec_mode);
|
|
|
|
|
|
+ fec_par.clone(g_fec_par);
|
|
|
+ clear_data();
|
|
|
|
|
|
}
|
|
|
+/*
|
|
|
int fec_encode_manager_t::reset_fec_parameter(int data_num,int redundant_num,int mtu,int queue_len,int timeout,int mode)
|
|
|
{
|
|
|
fec_data_num=data_num;
|
|
@@ -172,7 +177,7 @@ int fec_encode_manager_t::reset_fec_parameter(int data_num,int redundant_num,int
|
|
|
|
|
|
clear_data();
|
|
|
return 0;
|
|
|
-}
|
|
|
+}*/
|
|
|
int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/)
|
|
|
{
|
|
|
if(counter==0)
|
|
@@ -180,7 +185,7 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/)
|
|
|
my_itimerspec its;
|
|
|
memset(&its.it_interval,0,sizeof(its.it_interval));
|
|
|
first_packet_time=get_current_time_us();
|
|
|
- my_time_t tmp_time=fec_timeout+first_packet_time;
|
|
|
+ my_time_t tmp_time=fec_par.timeout+first_packet_time;
|
|
|
its.it_value.tv_sec=tmp_time/1000000llu;
|
|
|
its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu;
|
|
|
//timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
|
|
@@ -191,11 +196,11 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/)
|
|
|
|
|
|
//ev_timer_set(loop,)
|
|
|
}
|
|
|
- if(fec_mode==0)//for type 0 use blob
|
|
|
+ if(fec_par.mode==0)//for type 0 use blob
|
|
|
{
|
|
|
assert(blob_encode.input(s,len)==0);
|
|
|
}
|
|
|
- else if(fec_mode==1)//for tpe 1 use input_buf and counter
|
|
|
+ else if(fec_par.mode==1)//for tpe 1 use input_buf and counter
|
|
|
{
|
|
|
mylog(log_trace,"counter=%d\n",counter);
|
|
|
assert(len<=65535&&len>=0);
|
|
@@ -217,33 +222,28 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/)
|
|
|
}
|
|
|
int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
|
|
|
{
|
|
|
- if(counter==0&&dynamic_update_fec)
|
|
|
+ if(counter==0&&fec_par.version!=g_fec_par.version)
|
|
|
{
|
|
|
- fec_data_num=g_fec_data_num;
|
|
|
- fec_redundant_num=g_fec_redundant_num;
|
|
|
- fec_mtu=g_fec_mtu;
|
|
|
- fec_queue_len=g_fec_queue_len;
|
|
|
- fec_timeout=g_fec_timeout;
|
|
|
- fec_mode=g_fec_mode;
|
|
|
+ fec_par.clone(g_fec_par);
|
|
|
}
|
|
|
|
|
|
int about_to_fec=0;
|
|
|
int delayed_append=0;
|
|
|
//int counter_back=counter;
|
|
|
- assert(fec_mode==0||fec_mode==1);
|
|
|
+ assert(fec_par.mode==0||fec_par.mode==1);
|
|
|
|
|
|
- if(fec_mode==0&& s!=0 &&counter==0)
|
|
|
+ if(fec_par.mode==0&& s!=0 &&counter==0)
|
|
|
{
|
|
|
- int out_len=blob_encode.get_shard_len(fec_data_num,len);
|
|
|
- if(out_len>fec_mtu)
|
|
|
+ int out_len=blob_encode.get_shard_len(fec_par.get_tail().x,len);
|
|
|
+ if(out_len>fec_par.mtu)
|
|
|
{
|
|
|
- mylog(log_warn,"message too long ori_len=%d out_len=%d fec_mtu=%d,ignored\n",len,out_len,fec_mtu);
|
|
|
+ mylog(log_warn,"message too long ori_len=%d out_len=%d fec_mtu=%d,ignored\n",len,out_len,fec_par.mtu);
|
|
|
return -1;
|
|
|
}
|
|
|
}
|
|
|
- if(fec_mode==1&&s!=0&&len>fec_mtu)
|
|
|
+ if(fec_par.mode==1&&s!=0&&len>fec_par.mtu)
|
|
|
{
|
|
|
- mylog(log_warn,"mode==1,message len=%d,len>fec_mtu,fec_mtu=%d,packet may not be delivered\n",len,fec_mtu);
|
|
|
+ mylog(log_warn,"mode==1,message len=%d,len>fec_mtu,fec_mtu=%d,packet may not be delivered\n",len,fec_par.mtu);
|
|
|
//return -1;
|
|
|
}
|
|
|
if(s==0&&counter==0)
|
|
@@ -253,10 +253,10 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
|
|
|
}
|
|
|
if(s==0) about_to_fec=1;//now
|
|
|
|
|
|
- if(fec_mode==0&& blob_encode.get_shard_len(fec_data_num,len)>fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet
|
|
|
+ if(fec_par.mode==0&& blob_encode.get_shard_len(fec_par.get_tail().x,len)>fec_par.mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet
|
|
|
|
|
|
- if(fec_mode==0) assert(counter<fec_queue_len);//counter will never equal fec_pending_num,if that happens fec should already been done.
|
|
|
- if(fec_mode==1) assert(counter<fec_data_num);
|
|
|
+ if(fec_par.mode==0) assert(counter<fec_par.queue_len);//counter will never equal fec_pending_num,if that happens fec should already been done.
|
|
|
+ if(fec_par.mode==1) assert(counter<fec_par.get_tail().x);
|
|
|
|
|
|
|
|
|
if(s!=0&&!delayed_append)
|
|
@@ -264,9 +264,9 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
|
|
|
append(s,len);
|
|
|
}
|
|
|
|
|
|
- if(fec_mode==0&& counter==fec_queue_len) about_to_fec=1;
|
|
|
+ if(fec_par.mode==0&& counter==fec_par.queue_len) about_to_fec=1;
|
|
|
|
|
|
- if(fec_mode==1&& counter==fec_data_num) about_to_fec=1;
|
|
|
+ if(fec_par.mode==1&& counter==fec_par.get_tail().x) about_to_fec=1;
|
|
|
|
|
|
|
|
|
if(about_to_fec)
|
|
@@ -284,39 +284,46 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
|
|
|
int actual_data_num;
|
|
|
int actual_redundant_num;
|
|
|
|
|
|
- if(fec_mode==0)
|
|
|
+ if(fec_par.mode==0)
|
|
|
{
|
|
|
|
|
|
- actual_data_num=fec_data_num;
|
|
|
- actual_redundant_num=fec_redundant_num;
|
|
|
+ actual_data_num=fec_par.get_tail().x;
|
|
|
+ actual_redundant_num=fec_par.get_tail().y;
|
|
|
|
|
|
if(short_packet_optimize)
|
|
|
{
|
|
|
- u32_t best_len=(blob_encode.get_shard_len(fec_data_num,0)+header_overhead)*(fec_data_num+fec_redundant_num);
|
|
|
- int best_data_num=fec_data_num;
|
|
|
+ u32_t best_len=(blob_encode.get_shard_len(actual_data_num,0)+header_overhead)*(actual_data_num+actual_redundant_num);
|
|
|
+ int best_data_num=actual_data_num;
|
|
|
+ assert(actual_data_num<=fec_par.rs_cnt);
|
|
|
for(int i=1;i<actual_data_num;i++)
|
|
|
{
|
|
|
- u32_t shard_len=blob_encode.get_shard_len(i,0);
|
|
|
- if(shard_len>(u32_t)fec_mtu) continue;
|
|
|
-
|
|
|
- u32_t new_len=(shard_len+header_overhead)*(i+fec_redundant_num);
|
|
|
+ assert(fec_par.rs_par[i-1].x==i);
|
|
|
+ int tmp_x=fec_par.rs_par[i-1].x;
|
|
|
+ int tmp_y=fec_par.rs_par[i-1].y;
|
|
|
+ assert(tmp_x==i);
|
|
|
+ u32_t shard_len=blob_encode.get_shard_len(tmp_x,0);
|
|
|
+ if(shard_len>(u32_t)fec_par.mtu) continue;
|
|
|
+
|
|
|
+ u32_t new_len=(shard_len+header_overhead)*(tmp_x+tmp_y);
|
|
|
if(new_len<best_len)
|
|
|
{
|
|
|
best_len=new_len;
|
|
|
- best_data_num=i;
|
|
|
+ best_data_num=tmp_x;
|
|
|
}
|
|
|
}
|
|
|
actual_data_num=best_data_num;
|
|
|
- actual_redundant_num=fec_redundant_num;
|
|
|
- mylog(log_trace,"actual_data_num=%d actual_redundant_num=%d\n",best_data_num,fec_redundant_num);
|
|
|
+ assert(best_data_num>=1&&best_data_num<=fec_par.rs_cnt);
|
|
|
+ actual_redundant_num=fec_par.rs_par[best_data_num-1].y;
|
|
|
+ mylog(log_trace,"actual_data_num=%d actual_redundant_num=%d\n",actual_data_num,actual_redundant_num);
|
|
|
}
|
|
|
|
|
|
assert(blob_encode.output(actual_data_num,blob_output,fec_len)==0);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ assert(counter<=fec_par.rs_cnt);
|
|
|
actual_data_num=counter;
|
|
|
- actual_redundant_num=fec_redundant_num;
|
|
|
+ actual_redundant_num=fec_par.rs_par[counter-1].y;
|
|
|
|
|
|
for(int i=0;i<counter;i++)
|
|
|
{
|
|
@@ -334,8 +341,8 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
|
|
|
|
|
|
write_u32(input_buf[i] + tmp_idx, seq);
|
|
|
tmp_idx += sizeof(u32_t);
|
|
|
- input_buf[i][tmp_idx++] = (unsigned char) fec_mode;
|
|
|
- if (fec_mode == 1 && i < actual_data_num)
|
|
|
+ input_buf[i][tmp_idx++] = (unsigned char) fec_par.mode;
|
|
|
+ if (fec_par.mode == 1 && i < actual_data_num)
|
|
|
{
|
|
|
input_buf[i][tmp_idx++] = (unsigned char) 0;
|
|
|
input_buf[i][tmp_idx++] = (unsigned char) 0;
|
|
@@ -348,7 +355,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
|
|
|
|
|
|
tmp_output_buf[i]=input_buf[i]+tmp_idx; //////caution ,trick here.
|
|
|
|
|
|
- if(fec_mode==0)
|
|
|
+ if(fec_par.mode==0)
|
|
|
{
|
|
|
output_len[i]=tmp_idx+fec_len;
|
|
|
if(i<actual_data_num)
|
|
@@ -419,7 +426,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
|
|
|
ev_timer_stop(loop, &timer);
|
|
|
//timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
|
|
|
|
|
|
- if(encode_fast_send&&fec_mode==1)
|
|
|
+ if(encode_fast_send&&fec_par.mode==1)
|
|
|
{
|
|
|
int packet_to_send[max_fec_packet_num+5]={0};
|
|
|
int packet_to_send_counter=0;
|
|
@@ -443,7 +450,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- if(encode_fast_send&&s!=0&&fec_mode==1)
|
|
|
+ if(encode_fast_send&&s!=0&&fec_par.mode==1)
|
|
|
{
|
|
|
assert(counter>=1);
|
|
|
assert(counter<=255);
|
|
@@ -458,7 +465,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
|
|
|
write_u32(input_buf[input_buf_idx]+tmp_idx,seq);
|
|
|
tmp_idx+=sizeof(u32_t);
|
|
|
|
|
|
- input_buf[input_buf_idx][tmp_idx++]=(unsigned char)fec_mode;
|
|
|
+ input_buf[input_buf_idx][tmp_idx++]=(unsigned char)fec_par.mode;
|
|
|
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0;
|
|
|
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0;
|
|
|
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)((u32_t)input_buf_idx);
|
|
@@ -480,7 +487,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
|
|
|
|
|
|
if(s!=0&&delayed_append)
|
|
|
{
|
|
|
- assert(fec_mode!=1);
|
|
|
+ assert(fec_par.mode!=1);
|
|
|
append(s,len);
|
|
|
}
|
|
|
|