main.cpp 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559
  1. #include "common.h"
  2. #include "log.h"
  3. #include "git_version.h"
  4. #include "lib/rs.h"
  5. #include "packet.h"
  6. #include "connection.h"
  7. #include "fd_manager.h"
  8. #include "delay_manager.h"
  9. #include "fec_manager.h"
  10. using namespace std;
  11. typedef unsigned long long u64_t; //this works on most platform,avoid using the PRId64
  12. typedef long long i64_t;
  13. typedef unsigned int u32_t;
  14. typedef int i32_t;
  15. //int dup_num=1;
  16. //int dup_delay_min=20; //0.1ms
  17. //int dup_delay_max=20;
  18. //int random_number_fd=-1;
  19. int mtu_warn=1350;
  20. int disable_mtu_warn=0;
  21. int disable_fec=0;
  22. int debug_force_flush_fec=0;
  23. int fec_data_num=20;
  24. int fec_redundant_num=10;
  25. int fec_mtu=1250;
  26. int fec_pending_num=200;
  27. int fec_pending_time=10*1000; //10ms
  28. int fec_type=0;
  29. int jitter_min=0*1000;
  30. int jitter_max=0*1000;
  31. int output_interval_min=0*1000;
  32. int output_interval_max=0*1000;
  33. int fix_latency=0;
  34. u32_t local_ip_uint32,remote_ip_uint32=0;
  35. char local_ip[100], remote_ip[100];
  36. int local_port = -1, remote_port = -1;
  37. u64_t last_report_time=0;
  38. int report_interval=0;
  39. conn_manager_t conn_manager;
  40. delay_manager_t delay_manager;
  41. fd_manager_t fd_manager;
  42. int time_mono_test=1;
  43. const int disable_conv_clear=0;
  44. int socket_buf_size=1024*1024;
  45. int VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV;
  46. int init_listen_socket()
  47. {
  48. local_listen_fd =socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  49. int yes = 1;
  50. //setsockopt(udp_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
  51. struct sockaddr_in local_me={0};
  52. socklen_t slen = sizeof(sockaddr_in);
  53. //memset(&local_me, 0, sizeof(local_me));
  54. local_me.sin_family = AF_INET;
  55. local_me.sin_port = htons(local_port);
  56. local_me.sin_addr.s_addr = local_ip_uint32;
  57. if (bind(local_listen_fd, (struct sockaddr*) &local_me, slen) == -1) {
  58. mylog(log_fatal,"socket bind error\n");
  59. //perror("socket bind error");
  60. myexit(1);
  61. }
  62. setnonblocking(local_listen_fd);
  63. set_buf_size(local_listen_fd,socket_buf_size);
  64. mylog(log_debug,"local_listen_fd=%d\n,",local_listen_fd);
  65. return 0;
  66. }
  67. int new_connected_socket(int &fd,u32_t ip,int port)
  68. {
  69. char ip_port[40];
  70. sprintf(ip_port,"%s:%d",my_ntoa(ip),port);
  71. struct sockaddr_in remote_addr_in = { 0 };
  72. socklen_t slen = sizeof(sockaddr_in);
  73. //memset(&remote_addr_in, 0, sizeof(remote_addr_in));
  74. remote_addr_in.sin_family = AF_INET;
  75. remote_addr_in.sin_port = htons(port);
  76. remote_addr_in.sin_addr.s_addr = ip;
  77. fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  78. if (fd < 0) {
  79. mylog(log_warn, "[%s]create udp_fd error\n", ip_port);
  80. return -1;
  81. }
  82. setnonblocking(fd);
  83. set_buf_size(fd, socket_buf_size);
  84. mylog(log_debug, "[%s]created new udp_fd %d\n", ip_port, fd);
  85. int ret = connect(fd, (struct sockaddr *) &remote_addr_in, slen);
  86. if (ret != 0) {
  87. mylog(log_warn, "[%s]fd connect fail\n",ip_port);
  88. close(fd);
  89. return -1;
  90. }
  91. return 0;
  92. }
  93. int delay_send(my_time_t delay,const dest_t &dest,char *data,int len)
  94. {
  95. //int rand=random()%100;
  96. //mylog(log_info,"rand = %d\n",rand);
  97. if (dest.cook&&random_drop != 0) {
  98. if (get_true_random_number() % 10000 < (u32_t) random_drop) {
  99. return 0;
  100. }
  101. }
  102. return delay_manager.add(delay,dest,data,len);;
  103. }
  104. int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,my_time_t *&out_delay)
  105. {
  106. static my_time_t out_delay_buf[max_fec_packet_num+100]={0};
  107. //static int out_len_buf[max_fec_packet_num+100]={0};
  108. //static int counter=0;
  109. out_delay=out_delay_buf;
  110. //out_len=out_len_buf;
  111. if(disable_fec)
  112. {
  113. if(data==0) return 0;
  114. out_n=1;
  115. static char *data_static;
  116. data_static=data;
  117. static int len_static;
  118. len_static=len;
  119. out_arr=&data_static;
  120. out_len=&len_static;
  121. out_delay[0]=0;
  122. }
  123. else
  124. {
  125. //counter++;
  126. conn_info.fec_encode_manager.input(data,len);
  127. //if(counter%5==0)
  128. //conn_info.fec_encode_manager.input(0,0);
  129. //int n;
  130. //char **s_arr;
  131. //int s_len;
  132. conn_info.fec_encode_manager.output(out_n,out_arr,out_len);
  133. if(out_n>0)
  134. {
  135. my_time_t common_latency=0;
  136. my_time_t first_packet_time=conn_info.fec_encode_manager.get_first_packet_time();
  137. my_time_t current_time=get_current_time_us();
  138. if(fix_latency==1&&first_packet_time!=0)
  139. {
  140. my_time_t tmp;
  141. if((my_time_t)fec_pending_time >=(current_time - first_packet_time))
  142. {
  143. tmp=(my_time_t)fec_pending_time-(current_time - first_packet_time);
  144. }
  145. else tmp=0;
  146. common_latency+=tmp;
  147. }
  148. common_latency+=random_between(jitter_min,jitter_max);
  149. out_delay_buf[0]=common_latency;
  150. for(int i=1;i<out_n;i++)
  151. {
  152. out_delay_buf[i]=out_delay_buf[i-1]+ (my_time_t)( random_between(output_interval_min,output_interval_max)/(out_n-1) );
  153. }
  154. }
  155. }
  156. mylog(log_trace,"from_normal_to_fec input_len=%d,output_n=%d\n",len,out_n);
  157. if(out_n>0)
  158. {
  159. log_bare(log_trace,"seq= %u ",read_u32(out_arr[0]));
  160. }
  161. for(int i=0;i<out_n;i++)
  162. {
  163. log_bare(log_trace,"%d ",out_len[i]);
  164. }
  165. log_bare(log_trace,"\n");
  166. //for(int i=0;i<n;i++)
  167. //{
  168. //delay_send(0,dest,s_arr[i],s_len);
  169. //}
  170. //delay_send(0,dest,data,len);
  171. //delay_send(1000*1000,dest,data,len);
  172. return 0;
  173. }
  174. int from_fec_to_normal(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,my_time_t *&out_delay)
  175. {
  176. static my_time_t out_delay_buf[max_fec_pending_packet_num+100]={0};
  177. out_delay=out_delay_buf;
  178. if(disable_fec)
  179. {
  180. if(data==0) return 0;
  181. out_n=1;
  182. static char *data_static;
  183. data_static=data;
  184. static int len_static;
  185. len_static=len;
  186. out_arr=&data_static;
  187. out_len=&len_static;
  188. out_delay[0]=0;
  189. }
  190. else
  191. {
  192. conn_info.fec_decode_manager.input(data,len);
  193. //int n;char ** s_arr;int* len_arr;
  194. conn_info.fec_decode_manager.output(out_n,out_arr,out_len);
  195. for(int i=0;i<out_n;i++)
  196. {
  197. out_delay_buf[i]=0;
  198. }
  199. }
  200. mylog(log_trace,"from_fec_to_normal input_len=%d,output_n=%d,input_seq=%u\n",len,out_n,read_u32(data));
  201. // printf("<n:%d>",n);
  202. /*
  203. for(int i=0;i<n;i++)
  204. {
  205. delay_send(0,dest,s_arr[i],len_arr[i]);
  206. //s_arr[i][len_arr[i]]=0;
  207. //printf("<%s>\n",s_arr[i]);
  208. }*/
  209. //my_send(dest,data,len);
  210. return 0;
  211. }
  212. int client_event_loop()
  213. {
  214. //char buf[buf_len];
  215. int i, j, k;int ret;
  216. int yes = 1;
  217. int epoll_fd;
  218. int remote_fd;
  219. fd64_t remote_fd64;
  220. conn_info_t *conn_info_p=new conn_info_t;
  221. conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack
  222. //conn_info.conv_manager.reserve();
  223. conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type);
  224. init_listen_socket();
  225. epoll_fd = epoll_create1(0);
  226. assert(epoll_fd>0);
  227. const int max_events = 4096;
  228. struct epoll_event ev, events[max_events];
  229. if (epoll_fd < 0) {
  230. mylog(log_fatal,"epoll return %d\n", epoll_fd);
  231. myexit(-1);
  232. }
  233. ev.events = EPOLLIN;
  234. ev.data.u64 = local_listen_fd;
  235. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev);
  236. if (ret!=0) {
  237. mylog(log_fatal,"add udp_listen_fd error\n");
  238. myexit(-1);
  239. }
  240. assert(new_connected_socket(remote_fd,remote_ip_uint32,remote_port)==0);
  241. remote_fd64=fd_manager.create(remote_fd);
  242. mylog(log_debug,"remote_fd64=%llu\n",remote_fd64);
  243. ev.events = EPOLLIN;
  244. ev.data.u64 = remote_fd64;
  245. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, remote_fd, &ev);
  246. if (ret!= 0) {
  247. mylog(log_fatal,"add raw_fd error\n");
  248. myexit(-1);
  249. }
  250. ev.events = EPOLLIN;
  251. ev.data.u64 = delay_manager.get_timer_fd();
  252. mylog(log_debug,"delay_manager.get_timer_fd()=%d\n",delay_manager.get_timer_fd());
  253. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
  254. if (ret!= 0) {
  255. mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
  256. myexit(-1);
  257. }
  258. u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64();
  259. ev.events = EPOLLIN;
  260. ev.data.u64 = fd64;
  261. mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64());
  262. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev);
  263. if (ret!= 0) {
  264. mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n");
  265. myexit(-1);
  266. }
  267. //my_timer_t timer;
  268. conn_info.timer.add_fd_to_epoll(epoll_fd);
  269. conn_info.timer.set_timer_repeat_us(timer_interval*1000);
  270. mylog(log_debug,"conn_info.timer.get_timer_fd()=%d\n",conn_info.timer.get_timer_fd());
  271. while(1)////////////////////////
  272. {
  273. if(about_to_exit) myexit(0);
  274. int nfds = epoll_wait(epoll_fd, events, max_events, 180 * 1000);
  275. if (nfds < 0) { //allow zero
  276. if(errno==EINTR )
  277. {
  278. mylog(log_info,"epoll interrupted by signal\n");
  279. myexit(0);
  280. }
  281. else
  282. {
  283. mylog(log_fatal,"epoll_wait return %d\n", nfds);
  284. myexit(-1);
  285. }
  286. }
  287. int idx;
  288. for (idx = 0; idx < nfds; ++idx) {
  289. if(events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd())
  290. {
  291. uint64_t value;
  292. read(conn_info.timer.get_timer_fd(), &value, 8);
  293. conn_info.conv_manager.clear_inactive();
  294. mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n");
  295. if(debug_force_flush_fec)
  296. {
  297. int out_n;char **out_arr;int *out_len;my_time_t *out_delay;
  298. dest_t dest;
  299. dest.type=type_fd64;
  300. dest.inner.fd64=remote_fd64;
  301. dest.cook=1;
  302. from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
  303. for(int i=0;i<out_n;i++)
  304. {
  305. delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
  306. }
  307. }
  308. }
  309. else if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64())
  310. {
  311. char data[buf_len];
  312. int data_len;
  313. ip_port_t ip_port;
  314. u32_t conv;
  315. int out_n;char **out_arr;int *out_len;my_time_t *out_delay;
  316. dest_t dest;
  317. dest.type=type_fd64;
  318. dest.inner.fd64=remote_fd64;
  319. dest.cook=1;
  320. if(events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64())
  321. {
  322. mylog(log_trace,"events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()\n");
  323. //mylog(log_info,"timer!!!\n");
  324. uint64_t value;
  325. if(!fd_manager.exist(fd64)) //fd64 has been closed
  326. {
  327. mylog(log_trace,"!fd_manager.exist(fd64)");
  328. continue;
  329. }
  330. if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8)
  331. {
  332. mylog(log_trace,"(ret=read(fd_manager.to_fd(fd64), &value, 8))!=8,ret=%d\n",ret);
  333. continue;
  334. }
  335. if(value==0)
  336. {
  337. mylog(log_debug,"value==0\n");
  338. continue;
  339. }
  340. assert(value==1);
  341. from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
  342. //from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
  343. }
  344. else//events[idx].data.u64 == (u64_t)local_listen_fd
  345. {
  346. mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n");
  347. struct sockaddr_in udp_new_addr_in={0};
  348. socklen_t udp_new_addr_len = sizeof(sockaddr_in);
  349. if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0,
  350. (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) {
  351. mylog(log_error,"recv_from error,this shouldnt happen at client\n");
  352. myexit(1);
  353. };
  354. if(!disable_mtu_warn&&data_len>=mtu_warn)
  355. {
  356. mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  357. }
  358. mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
  359. ntohs(udp_new_addr_in.sin_port),data_len);
  360. ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
  361. ip_port.port=ntohs(udp_new_addr_in.sin_port);
  362. u64_t u64=ip_port.to_u64();
  363. if(!conn_info.conv_manager.is_u64_used(u64))
  364. {
  365. if(conn_info.conv_manager.get_size() >=max_conv_num)
  366. {
  367. mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n");
  368. continue;
  369. }
  370. conv=conn_info.conv_manager.get_new_conv();
  371. conn_info.conv_manager.insert_conv(conv,u64);
  372. mylog(log_info,"new packet from %s:%d,conv_id=%x\n",inet_ntoa(udp_new_addr_in.sin_addr),ntohs(udp_new_addr_in.sin_port),conv);
  373. }
  374. else
  375. {
  376. conv=conn_info.conv_manager.find_conv_by_u64(u64);
  377. mylog(log_trace,"conv=%d\n",conv);
  378. }
  379. conn_info.conv_manager.update_active_time(conv);
  380. char * new_data;
  381. int new_len;
  382. put_conv(conv,data,data_len,new_data,new_len);
  383. mylog(log_trace,"data_len=%d new_len=%d\n",data_len,new_len);
  384. //dest.conv=conv;
  385. from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay);
  386. }
  387. mylog(log_trace,"out_n=%d\n",out_n);
  388. for(int i=0;i<out_n;i++)
  389. {
  390. delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
  391. }
  392. //my_send(dest,data,data_len);
  393. }
  394. else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) {
  395. uint64_t value;
  396. read(delay_manager.get_timer_fd(), &value, 8);
  397. mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n");
  398. //printf("<timerfd_triggered, %d>",delay_mp.size());
  399. //fflush(stdout);
  400. }
  401. else if(events[idx].data.u64>u32_t(-1) )
  402. {
  403. char data[buf_len];
  404. if(!fd_manager.exist(events[idx].data.u64)) //fd64 has been closed
  405. {
  406. mylog(log_trace,"!fd_manager.exist(events[idx].data.u64)");
  407. continue;
  408. }
  409. assert(events[idx].data.u64==remote_fd64);
  410. int fd=fd_manager.to_fd(remote_fd64);
  411. int data_len =recv(fd,data,max_data_len,0);
  412. mylog(log_trace, "received data from udp fd %d, len=%d\n", remote_fd,data_len);
  413. if(data_len<0)
  414. {
  415. if(errno==ECONNREFUSED)
  416. {
  417. //conn_manager.clear_list.push_back(udp_fd);
  418. mylog(log_debug, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
  419. }
  420. mylog(log_warn, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
  421. continue;
  422. }
  423. if(!disable_mtu_warn&&data_len>mtu_warn)
  424. {
  425. mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  426. }
  427. if(de_cook(data,data_len)!=0)
  428. {
  429. mylog(log_debug,"de_cook error");
  430. continue;
  431. }
  432. int out_n;char **out_arr;int *out_len;my_time_t *out_delay;
  433. from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay);
  434. mylog(log_trace,"out_n=%d\n",out_n);
  435. for(int i=0;i<out_n;i++)
  436. {
  437. u32_t conv;
  438. char *new_data;
  439. int new_len;
  440. if(get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0)
  441. {
  442. mylog(log_debug,"get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0");
  443. continue;
  444. }
  445. if(!conn_info.conv_manager.is_conv_used(conv))
  446. {
  447. mylog(log_trace,"!conn_info.conv_manager.is_conv_used(conv)");
  448. }
  449. conn_info.conv_manager.update_active_time(conv);
  450. u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv);
  451. dest_t dest;
  452. dest.inner.ip_port.from_u64(u64);
  453. dest.type=type_ip_port;
  454. //dest.conv=conv;
  455. delay_send(out_delay[i],dest,new_data,new_len);
  456. }
  457. //mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s());
  458. }
  459. else
  460. {
  461. mylog(log_fatal,"unknown fd,this should never happen\n");
  462. myexit(-1);
  463. }
  464. }
  465. delay_manager.check();
  466. }
  467. return 0;
  468. }
  469. int server_event_loop()
  470. {
  471. //char buf[buf_len];
  472. int i, j, k;int ret;
  473. int yes = 1;
  474. int epoll_fd;
  475. int remote_fd;
  476. // conn_info_t conn_info;
  477. init_listen_socket();
  478. epoll_fd = epoll_create1(0);
  479. assert(epoll_fd>0);
  480. const int max_events = 4096;
  481. struct epoll_event ev, events[max_events];
  482. if (epoll_fd < 0) {
  483. mylog(log_fatal,"epoll return %d\n", epoll_fd);
  484. myexit(-1);
  485. }
  486. ev.events = EPOLLIN;
  487. ev.data.u64 = local_listen_fd;
  488. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev);
  489. if (ret!=0) {
  490. mylog(log_fatal,"add udp_listen_fd error\n");
  491. myexit(-1);
  492. }
  493. ev.events = EPOLLIN;
  494. ev.data.u64 = delay_manager.get_timer_fd();
  495. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
  496. if (ret!= 0) {
  497. mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
  498. myexit(-1);
  499. }
  500. mylog(log_debug," delay_manager.get_timer_fd() =%d\n", delay_manager.get_timer_fd());
  501. mylog(log_info,"now listening at %s:%d\n",my_ntoa(local_ip_uint32),local_port);
  502. my_timer_t timer;
  503. timer.add_fd_to_epoll(epoll_fd);
  504. timer.set_timer_repeat_us(timer_interval*1000);
  505. mylog(log_debug," timer.get_timer_fd() =%d\n",timer.get_timer_fd());
  506. while(1)////////////////////////
  507. {
  508. if(about_to_exit) myexit(0);
  509. int nfds = epoll_wait(epoll_fd, events, max_events, 180 * 1000);
  510. if (nfds < 0) { //allow zero
  511. if(errno==EINTR )
  512. {
  513. mylog(log_info,"epoll interrupted by signal\n");
  514. myexit(0);
  515. }
  516. else
  517. {
  518. mylog(log_fatal,"epoll_wait return %d\n", nfds);
  519. myexit(-1);
  520. }
  521. }
  522. int idx;
  523. for (idx = 0; idx < nfds; ++idx)
  524. {
  525. /*
  526. if ((events[idx].data.u64 ) == (u64_t)timer_fd)
  527. {
  528. conn_manager.clear_inactive();
  529. u64_t dummy;
  530. read(timer_fd, &dummy, 8);
  531. //current_time_rough=get_current_time();
  532. }
  533. else */
  534. if(events[idx].data.u64==(u64_t)timer.get_timer_fd())
  535. {
  536. uint64_t value;
  537. read(timer.get_timer_fd(), &value, 8);
  538. conn_manager.clear_inactive();
  539. mylog(log_trace,"events[idx].data.u64==(u64_t)timer.get_timer_fd()\n");
  540. //conn_info.conv_manager.clear_inactive();
  541. }
  542. else if (events[idx].data.u64 == (u64_t)local_listen_fd)
  543. {
  544. mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n");
  545. //int recv_len;
  546. char data[buf_len];
  547. int data_len;
  548. struct sockaddr_in udp_new_addr_in={0};
  549. socklen_t udp_new_addr_len = sizeof(sockaddr_in);
  550. if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0,
  551. (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) {
  552. mylog(log_error,"recv_from error,this shouldnt happen at client\n");
  553. myexit(1);
  554. };
  555. mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
  556. ntohs(udp_new_addr_in.sin_port),data_len);
  557. if(!disable_mtu_warn&&data_len>=mtu_warn)///////////////////////delete this for type 0 in furture
  558. {
  559. mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  560. }
  561. if(de_cook(data,data_len)!=0)
  562. {
  563. mylog(log_debug,"de_cook error");
  564. continue;
  565. }
  566. ip_port_t ip_port;
  567. ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
  568. ip_port.port=ntohs(udp_new_addr_in.sin_port);
  569. mylog(log_trace,"ip_port= %s\n",ip_port.to_s());
  570. if(!conn_manager.exist(ip_port))
  571. {
  572. if(conn_manager.mp.size() >=max_conn_num)
  573. {
  574. mylog(log_warn,"new connection %s ignored bc max_conn_num exceed\n",ip_port.to_s());
  575. continue;
  576. }
  577. conn_manager.insert(ip_port);
  578. conn_info_t &conn_info=conn_manager.find(ip_port);
  579. conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type);
  580. //conn_info.conv_manager.reserve(); //already reserved in constructor
  581. u64_t fec_fd64=conn_info.fec_encode_manager.get_timer_fd64();
  582. mylog(log_debug,"fec_fd64=%llu\n",fec_fd64);
  583. ev.events = EPOLLIN;
  584. ev.data.u64 = fec_fd64;
  585. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fec_fd64), &ev);
  586. fd_manager.get_info(fec_fd64).ip_port=ip_port;
  587. conn_info.timer.add_fd64_to_epoll(epoll_fd);
  588. conn_info.timer.set_timer_repeat_us(timer_interval*1000);
  589. mylog(log_debug,"conn_info.timer.get_timer_fd64()=%llu\n",conn_info.timer.get_timer_fd64());
  590. u64_t timer_fd64=conn_info.timer.get_timer_fd64();
  591. fd_manager.get_info(timer_fd64).ip_port=ip_port;
  592. mylog(log_info,"new connection from %s\n",ip_port.to_s());
  593. }
  594. conn_info_t &conn_info=conn_manager.find(ip_port);
  595. conn_info.update_active_time();
  596. int out_n;char **out_arr;int *out_len;my_time_t *out_delay;
  597. from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay);
  598. mylog(log_trace,"out_n= %d\n",out_n);
  599. for(int i=0;i<out_n;i++)
  600. {
  601. u32_t conv;
  602. char *new_data;
  603. int new_len;
  604. if(get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0)
  605. {
  606. mylog(log_debug,"get_conv failed");
  607. continue;
  608. }
  609. /*
  610. id_t tmp_conv_id;
  611. memcpy(&tmp_conv_id,&data_[0],sizeof(tmp_conv_id));
  612. tmp_conv_id=ntohl(tmp_conv_id);*/
  613. if (!conn_info.conv_manager.is_conv_used(conv))
  614. {
  615. if(conn_info.conv_manager.get_size() >=max_conv_num)
  616. {
  617. mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n");
  618. continue;
  619. }
  620. int new_udp_fd;
  621. ret=new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port);
  622. if (ret != 0) {
  623. mylog(log_warn, "[%s]new_connected_socket failed\n",ip_port.to_s());
  624. continue;
  625. }
  626. fd64_t fd64 = fd_manager.create(new_udp_fd);
  627. ev.events = EPOLLIN;
  628. ev.data.u64 = fd64;
  629. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_udp_fd, &ev);
  630. conn_info.conv_manager.insert_conv(conv, fd64);
  631. fd_manager.get_info(fd64).ip_port=ip_port;
  632. mylog(log_info,"[%s]new conv %x,fd %d created,fd64=%llu\n",ip_port.to_s(),conv,new_udp_fd,fd64);
  633. //assert(!conn_manager.exist_fd64(fd64));
  634. //conn_manager.insert_fd64(fd64,ip_port);
  635. }
  636. conn_info.conv_manager.update_active_time(conv);
  637. fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv);
  638. //int fd=fd_manager.fd64_to_fd(fd64);
  639. dest_t dest;
  640. dest.type=type_fd64;
  641. dest.inner.fd64=fd64;
  642. //dest.conv=conv;
  643. delay_send(out_delay[i],dest,new_data,new_len);
  644. }
  645. }
  646. else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) {
  647. uint64_t value;
  648. read(delay_manager.get_timer_fd(), &value, 8);
  649. mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n");
  650. }
  651. else if (events[idx].data.u64 >u32_t(-1))
  652. {
  653. char data[buf_len];
  654. int data_len;
  655. u32_t conv;
  656. fd64_t fd64=events[idx].data.u64;
  657. mylog(log_trace,"events[idx].data.u64 >u32_t(-1),%llu\n",(u64_t)events[idx].data.u64);
  658. if(!fd_manager.exist(fd64)) //fd64 has been closed
  659. {
  660. mylog(log_trace,"!fd_manager.exist(fd64)\n");
  661. continue;
  662. }
  663. assert(fd_manager.exist_info(fd64));
  664. ip_port_t ip_port=fd_manager.get_info(fd64).ip_port;
  665. assert(conn_manager.exist(ip_port));
  666. conn_info_t &conn_info=conn_manager.find(ip_port);
  667. //conn_info.update_active_time(); //cant put it here
  668. int out_n=-2;char **out_arr;int *out_len;my_time_t *out_delay;
  669. dest_t dest;
  670. dest.type=type_ip_port;
  671. //dest.conv=conv;
  672. dest.inner.ip_port=ip_port;
  673. dest.cook=1;
  674. if(fd64==conn_info.fec_encode_manager.get_timer_fd64())
  675. {
  676. //mylog(log_infol,"timer!!!\n");
  677. uint64_t value;
  678. if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8)
  679. {
  680. mylog(log_trace,"fd_manager.to_fd(fd64), &value, 8)!=8 ,%d\n",ret);
  681. continue;
  682. }
  683. if(value==0)
  684. {
  685. mylog(log_trace,"value==0\n");
  686. continue;
  687. }
  688. assert(value==1);
  689. from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
  690. }
  691. else if(fd64==conn_info.timer.get_timer_fd64())
  692. {
  693. uint64_t value;
  694. read(conn_info.timer.get_timer_fd(), &value, 8);
  695. conn_info.conv_manager.clear_inactive();
  696. if(debug_force_flush_fec)
  697. {
  698. from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
  699. }
  700. continue;
  701. }
  702. else
  703. {
  704. assert(conn_info.conv_manager.is_u64_used(fd64));
  705. conv=conn_info.conv_manager.find_conv_by_u64(fd64);
  706. conn_info.conv_manager.update_active_time(conv);
  707. conn_info.update_active_time();
  708. int fd=fd_manager.to_fd(fd64);
  709. data_len=recv(fd,data,max_data_len,0);
  710. mylog(log_trace,"received a packet from udp_fd,len:%d,conv=%d\n",data_len,conv);
  711. if(data_len<0)
  712. {
  713. mylog(log_debug,"udp fd,recv_len<0 continue,%s\n",strerror(errno));
  714. continue;
  715. }
  716. if(!disable_mtu_warn&&data_len>=mtu_warn)
  717. {
  718. mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  719. }
  720. char * new_data;
  721. int new_len;
  722. put_conv(conv,data,data_len,new_data,new_len);
  723. from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay);
  724. }
  725. mylog(log_trace,"out_n=%d\n",out_n);
  726. for(int i=0;i<out_n;i++)
  727. {
  728. delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
  729. }
  730. //mylog(log_trace,"[%s] send packet\n",ip_port.to_s());
  731. }
  732. else
  733. {
  734. mylog(log_fatal,"unknown fd,this should never happen\n");
  735. myexit(-1);
  736. }
  737. }
  738. delay_manager.check();
  739. }
  740. return 0;
  741. }
  742. int unit_test()
  743. {
  744. int i,j,k;
  745. void *code=fec_new(3,6);
  746. char arr[6][100]=
  747. {
  748. "aaa","bbb","ccc"
  749. ,"ddd","eee","fff"
  750. };
  751. char *data[6];
  752. for(i=0;i<6;i++)
  753. {
  754. data[i]=arr[i];
  755. }
  756. rs_encode2(3,6,data,3);
  757. //printf("%d %d",(int)(unsigned char)arr[5][0],(int)('a'^'b'^'c'^'d'^'e'));
  758. for(i=0;i<6;i++)
  759. {
  760. printf("<%s>",data[i]);
  761. }
  762. data[0]=0;
  763. //data[1]=0;
  764. //data[5]=0;
  765. int ret=rs_decode2(3,6,data,3);
  766. printf("ret:%d\n",ret);
  767. for(i=0;i<6;i++)
  768. {
  769. printf("<%s>",data[i]);
  770. }
  771. fec_free(code);
  772. char arr2[6][100]=
  773. {
  774. "aaa11111","","ccc333333333"
  775. ,"ddd444","eee5555","ff6666"
  776. };
  777. blob_encode_t blob_encode;
  778. for(int i=0;i<6;i++)
  779. blob_encode.input(arr2[i],strlen(arr2[i]));
  780. char **output;
  781. int shard_len;
  782. blob_encode.output(7,output,shard_len);
  783. printf("<shard_len:%d>",shard_len);
  784. blob_decode_t blob_decode;
  785. for(int i=0;i<7;i++)
  786. {
  787. blob_decode.input(output[i],shard_len);
  788. }
  789. char **decode_output;
  790. int * len_arr;
  791. int num;
  792. ret=blob_decode.output(num,decode_output,len_arr);
  793. printf("<num:%d,ret:%d>\n",num,ret);
  794. for(int i=0;i<num;i++)
  795. {
  796. char buf[1000]={0};
  797. memcpy(buf,decode_output[i],len_arr[i]);
  798. printf("<%d:%s>",len_arr[i],buf);
  799. }
  800. printf("\n");
  801. static fec_encode_manager_t fec_encode_manager;
  802. static fec_decode_manager_t fec_decode_manager;
  803. {
  804. string a = "11111";
  805. string b = "22";
  806. string c = "33333333";
  807. fec_encode_manager.input((char *) a.c_str(), a.length());
  808. fec_encode_manager.input((char *) b.c_str(), b.length());
  809. fec_encode_manager.input((char *) c.c_str(), c.length());
  810. fec_encode_manager.input(0, 0);
  811. int n;
  812. char **s_arr;
  813. int *len;
  814. fec_encode_manager.output(n,s_arr,len);
  815. printf("<n:%d,len:%d>",n,len[0]);
  816. for(int i=0;i<n;i++)
  817. {
  818. fec_decode_manager.input(s_arr[i],len[i]);
  819. }
  820. {
  821. int n;char ** s_arr;int* len_arr;
  822. fec_decode_manager.output(n,s_arr,len_arr);
  823. printf("<n:%d>",n);
  824. for(int i=0;i<n;i++)
  825. {
  826. s_arr[i][len_arr[i]]=0;
  827. printf("<%s>\n",s_arr[i]);
  828. }
  829. }
  830. }
  831. {
  832. string a = "aaaaaaa";
  833. string b = "bbbbbbbbbbbbb";
  834. string c = "ccc";
  835. fec_encode_manager.input((char *) a.c_str(), a.length());
  836. fec_encode_manager.input((char *) b.c_str(), b.length());
  837. fec_encode_manager.input((char *) c.c_str(), c.length());
  838. fec_encode_manager.input(0, 0);
  839. int n;
  840. char **s_arr;
  841. int * len;
  842. fec_encode_manager.output(n,s_arr,len);
  843. printf("<n:%d,len:%d>",n,len[0]);
  844. for(int i=0;i<n;i++)
  845. {
  846. if(i==1||i==3||i==5||i==0)
  847. fec_decode_manager.input(s_arr[i],len[i]);
  848. }
  849. {
  850. int n;char ** s_arr;int* len_arr;
  851. fec_decode_manager.output(n,s_arr,len_arr);
  852. printf("<n:%d>",n);
  853. for(int i=0;i<n;i++)
  854. {
  855. s_arr[i][len_arr[i]]=0;
  856. printf("<%s>\n",s_arr[i]);
  857. }
  858. }
  859. }
  860. for(int i=0;i<10;i++)
  861. {
  862. string a = "aaaaaaaaaaaaaaaaaaaaaaa";
  863. string b = "bbbbbbbbbbbbb";
  864. string c = "cccccccccccccccccc";
  865. printf("======\n");
  866. int n;
  867. char **s_arr;
  868. int * len;
  869. fec_decode_manager.output(n,s_arr,len);
  870. fec_encode_manager.re_init(3,2,fec_mtu,fec_pending_num,fec_pending_time,1);
  871. fec_encode_manager.input((char *) a.c_str(), a.length());
  872. fec_encode_manager.output(n,s_arr,len);
  873. assert(n==1);
  874. fec_decode_manager.input(s_arr[0],len[0]);
  875. fec_decode_manager.output(n,s_arr,len);
  876. assert(n==1);
  877. printf("%s\n",s_arr[0]);
  878. fec_encode_manager.input((char *) b.c_str(), b.length());
  879. fec_encode_manager.output(n,s_arr,len);
  880. assert(n==1);
  881. //fec_decode_manager.input(s_arr[0],len[0]);
  882. fec_encode_manager.input((char *) c.c_str(), c.length());
  883. fec_encode_manager.output(n,s_arr,len);
  884. assert(n==3);
  885. fec_decode_manager.input(s_arr[0],len[0]);
  886. //printf("n=%d\n",n);
  887. {
  888. int n;
  889. char **s_arr;
  890. int * len;
  891. fec_decode_manager.output(n,s_arr,len);
  892. assert(n==1);
  893. printf("%s\n",s_arr[0]);
  894. }
  895. fec_decode_manager.input(s_arr[1],len[1]);
  896. {
  897. int n;
  898. char **s_arr;
  899. int * len;
  900. fec_decode_manager.output(n,s_arr,len);
  901. assert(n==1);
  902. printf("n=%d\n",n);
  903. s_arr[0][len[0]]=0;
  904. printf("%s\n",s_arr[0]);
  905. }
  906. }
  907. return 0;
  908. }
  909. void print_help()
  910. {
  911. char git_version_buf[100]={0};
  912. strncpy(git_version_buf,gitversion,10);
  913. printf("UDPspeeder\n");
  914. printf("git version:%s ",git_version_buf);
  915. printf("build date:%s %s\n",__DATE__,__TIME__);
  916. printf("repository: https://github.com/wangyu-/UDPspeeder\n");
  917. printf("\n");
  918. printf("usage:\n");
  919. printf(" run as client : ./this_program -c -l local_listen_ip:local_port -r server_ip:server_port [options]\n");
  920. printf(" run as server : ./this_program -s -l server_listen_ip:server_port -r remote_ip:remote_port [options]\n");
  921. printf("\n");
  922. printf("common option,must be same on both sides:\n");
  923. printf(" -k,--key <string> key for simple xor encryption,default:\"secret key\"\n");
  924. printf("main options:\n");
  925. //printf(" -d <number> duplicated packet number, -d 0 means no duplicate. default value:0\n");
  926. //printf(" -t <number> duplicated packet delay time, unit: 0.1ms,default value:20(2ms)\n");
  927. printf(" -j <number> simulated jitter.randomly delay first packet for 0~jitter_value*0.1 ms,to\n");
  928. printf(" create simulated jitter.default value:0.do not use if you dont\n");
  929. printf(" know what it means\n");
  930. printf(" --report <number> turn on udp send/recv report,and set a time interval for reporting,unit:s\n");
  931. printf("advanced options:\n");
  932. //printf(" -t tmin:tmax simliar to -t above,but delay randomly between tmin and tmax\n");
  933. printf(" -j jmin:jmax simliar to -j above,but create jitter randomly between jmin and jmax\n");
  934. printf(" --random-drop <number> simulate packet loss ,unit:0.01%%\n");
  935. //printf(" --disable-filter disable duplicate packet filter.\n");
  936. //printf(" -m <number> max pending packets,to prevent the program from eating up all your memory,\n");
  937. //printf(" default value:0(disabled).\n");
  938. printf("other options:\n");
  939. printf(" --log-level <number> 0:never 1:fatal 2:error 3:warn \n");
  940. printf(" 4:info (default) 5:debug 6:trace\n");
  941. printf(" --log-position enable file name,function name,line number in log\n");
  942. printf(" --disable-color disable log color\n");
  943. printf(" --sock-buf <number> buf size for socket,>=10 and <=10240,unit:kbyte,default:1024\n");
  944. //printf(" -p use multi-process mode instead of epoll.very costly,only for test,do dont use\n");
  945. printf(" -h,--help print this help message\n");
  946. //printf("common options,these options must be same on both side\n");
  947. }
  948. void process_arg(int argc, char *argv[])
  949. {
  950. int is_client=0,is_server=0;
  951. int i, j, k;
  952. int opt;
  953. static struct option long_options[] =
  954. {
  955. {"log-level", required_argument, 0, 1},
  956. {"log-position", no_argument, 0, 1},
  957. {"disable-color", no_argument, 0, 1},
  958. {"disable-filter", no_argument, 0, 1},
  959. {"disable-fec", no_argument, 0, 1},
  960. {"disable-obs", no_argument, 0, 1},
  961. {"disable-xor", no_argument, 0, 1},
  962. {"fix-latency", no_argument, 0, 1},
  963. {"sock-buf", required_argument, 0, 1},
  964. {"random-drop", required_argument, 0, 1},
  965. {"report", required_argument, 0, 1},
  966. {NULL, 0, 0, 0}
  967. };
  968. int option_index = 0;
  969. if (argc == 1)
  970. {
  971. print_help();
  972. myexit( -1);
  973. }
  974. for (i = 0; i < argc; i++)
  975. {
  976. if(strcmp(argv[i],"--unit-test")==0)
  977. {
  978. unit_test();
  979. myexit(0);
  980. }
  981. }
  982. for (i = 0; i < argc; i++)
  983. {
  984. if(strcmp(argv[i],"-h")==0||strcmp(argv[i],"--help")==0)
  985. {
  986. print_help();
  987. myexit(0);
  988. }
  989. }
  990. for (i = 0; i < argc; i++)
  991. {
  992. if(strcmp(argv[i],"--log-level")==0)
  993. {
  994. if(i<argc -1)
  995. {
  996. sscanf(argv[i+1],"%d",&log_level);
  997. if(0<=log_level&&log_level<log_end)
  998. {
  999. }
  1000. else
  1001. {
  1002. log_bare(log_fatal,"invalid log_level\n");
  1003. myexit(-1);
  1004. }
  1005. }
  1006. }
  1007. if(strcmp(argv[i],"--disable-color")==0)
  1008. {
  1009. enable_log_color=0;
  1010. }
  1011. }
  1012. mylog(log_info,"argc=%d ", argc);
  1013. for (i = 0; i < argc; i++) {
  1014. log_bare(log_info, "%s ", argv[i]);
  1015. }
  1016. log_bare(log_info, "\n");
  1017. if (argc == 1)
  1018. {
  1019. print_help();
  1020. myexit(-1);
  1021. }
  1022. int no_l = 1, no_r = 1;
  1023. while ((opt = getopt_long(argc, argv, "l:r:d:t:hcsk:j:m:f:p:n:i:",long_options,&option_index)) != -1)
  1024. {
  1025. //string opt_key;
  1026. //opt_key+=opt;
  1027. switch (opt)
  1028. {
  1029. case 'k':
  1030. sscanf(optarg,"%s\n",key_string);
  1031. mylog(log_debug,"key=%s\n",key_string);
  1032. if(strlen(key_string)==0)
  1033. {
  1034. mylog(log_fatal,"key len=0??\n");
  1035. myexit(-1);
  1036. }
  1037. break;
  1038. /*
  1039. case 'm':
  1040. sscanf(optarg,"%d\n",&max_pending_packet);
  1041. if(max_pending_packet<1000)
  1042. {
  1043. mylog(log_fatal,"max_pending_packet must be >1000\n");
  1044. myexit(-1);
  1045. }
  1046. break;
  1047. */
  1048. case 'j':
  1049. if (strchr(optarg, ':') == 0)
  1050. {
  1051. int jitter;
  1052. sscanf(optarg,"%d\n",&jitter);
  1053. if(jitter<0 ||jitter>1000*100)
  1054. {
  1055. mylog(log_fatal,"jitter must be between 0 and 100,000(10 second)\n");
  1056. myexit(-1);
  1057. }
  1058. jitter_min=0;
  1059. jitter_max=jitter;
  1060. }
  1061. else
  1062. {
  1063. sscanf(optarg,"%d:%d\n",&jitter_min,&jitter_max);
  1064. if(jitter_min<0 ||jitter_max<0||jitter_min>jitter_max)
  1065. {
  1066. mylog(log_fatal," must satisfy 0<=jmin<=jmax\n");
  1067. myexit(-1);
  1068. }
  1069. }
  1070. jitter_min*=100;
  1071. jitter_max*=100;
  1072. break;
  1073. case 'i':
  1074. if (strchr(optarg, ':') == 0)
  1075. {
  1076. int output_interval=-1;
  1077. sscanf(optarg,"%d\n",&output_interval);
  1078. if(output_interval<0||output_interval>1000*100)
  1079. {
  1080. mylog(log_fatal,"output_interval must be between 0 and 100,000(10 second)\n");
  1081. myexit(-1);
  1082. }
  1083. output_interval_min=output_interval_max=output_interval;
  1084. }
  1085. else
  1086. {
  1087. sscanf(optarg,"%d:%d\n",&output_interval_min,&output_interval_max);
  1088. if(output_interval_min<0 ||output_interval_max<0||output_interval_min>output_interval_max)
  1089. {
  1090. mylog(log_fatal," must satisfy 0<=output_interval_min<=output_interval_max\n");
  1091. myexit(-1);
  1092. }
  1093. }
  1094. output_interval_min*=100;
  1095. output_interval_max*=100;
  1096. break;
  1097. case 'f':
  1098. if (strchr(optarg, ':') == 0)
  1099. {
  1100. mylog(log_fatal,"invalid format for f");
  1101. myexit(-1);
  1102. }
  1103. else
  1104. {
  1105. sscanf(optarg,"%d:%d\n",&fec_data_num,&fec_redundant_num);
  1106. if(fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>255)
  1107. {
  1108. mylog(log_fatal,"fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>255\n");
  1109. myexit(-1);
  1110. }
  1111. }
  1112. break;
  1113. case 't':
  1114. sscanf(optarg,"%d",&fec_type);
  1115. if(fec_type!=0&&fec_type!=1)
  1116. {
  1117. mylog(log_fatal,"mode should be 0 or 1\n");
  1118. myexit(-1);
  1119. }
  1120. break;
  1121. case 'm':
  1122. sscanf(optarg,"%d",&fec_mtu);
  1123. if(fec_mtu<500||fec_mtu>1600)
  1124. {
  1125. mylog(log_fatal,"fec_mtu should be between 500 and 1600\n");
  1126. myexit(-1);
  1127. }
  1128. break;
  1129. case 'p':
  1130. sscanf(optarg,"%d",&fec_pending_time);
  1131. if(fec_pending_time<0||fec_pending_time>10000)
  1132. {
  1133. mylog(log_fatal,"fec_pending_time should be between 0 and 10000\n");
  1134. myexit(-1);
  1135. }
  1136. fec_pending_time*=100;
  1137. break;
  1138. case 'n':
  1139. sscanf(optarg,"%d",&fec_pending_num);
  1140. if(fec_pending_num<1||fec_pending_num>1000)
  1141. {
  1142. mylog(log_fatal,"fec_pending_num should be between 1 and 1000\n");
  1143. myexit(-1);
  1144. }
  1145. break;
  1146. /*
  1147. case 'd':
  1148. dup_num=-1;
  1149. sscanf(optarg,"%d\n",&dup_num);
  1150. if(dup_num<0 ||dup_num>5)
  1151. {
  1152. mylog(log_fatal,"dup_num must be between 0 and 5\n");
  1153. myexit(-1);
  1154. }
  1155. dup_num+=1;
  1156. break;*/
  1157. case 'c':
  1158. is_client = 1;
  1159. break;
  1160. case 's':
  1161. is_server = 1;
  1162. break;
  1163. case 'l':
  1164. no_l = 0;
  1165. if (strchr(optarg, ':') != 0)
  1166. {
  1167. sscanf(optarg, "%[^:]:%d", local_ip, &local_port);
  1168. }
  1169. else
  1170. {
  1171. mylog(log_fatal," -r ip:port\n");
  1172. myexit(1);
  1173. strcpy(local_ip, "127.0.0.1");
  1174. sscanf(optarg, "%d", &local_port);
  1175. }
  1176. break;
  1177. case 'r':
  1178. no_r = 0;
  1179. if (strchr(optarg, ':') != 0)
  1180. {
  1181. //printf("in :\n");
  1182. //printf("%s\n",optarg);
  1183. sscanf(optarg, "%[^:]:%d", remote_ip, &remote_port);
  1184. //printf("%d\n",remote_port);
  1185. }
  1186. else
  1187. {
  1188. mylog(log_fatal," -r ip:port\n");
  1189. myexit(1);
  1190. strcpy(remote_ip, "127.0.0.1");
  1191. sscanf(optarg, "%d", &remote_port);
  1192. }
  1193. break;
  1194. case 'h':
  1195. break;
  1196. case 1:
  1197. if(strcmp(long_options[option_index].name,"log-level")==0)
  1198. {
  1199. }
  1200. else if(strcmp(long_options[option_index].name,"disable-filter")==0)
  1201. {
  1202. disable_replay_filter=1;
  1203. //enable_log_color=0;
  1204. }
  1205. else if(strcmp(long_options[option_index].name,"disable-color")==0)
  1206. {
  1207. //enable_log_color=0;
  1208. }
  1209. else if(strcmp(long_options[option_index].name,"disable-fec")==0)
  1210. {
  1211. disable_fec=1;
  1212. }
  1213. else if(strcmp(long_options[option_index].name,"disable-obs")==0)
  1214. {
  1215. disable_obscure=1;
  1216. }
  1217. else if(strcmp(long_options[option_index].name,"disable-xor")==0)
  1218. {
  1219. disable_xor=1;
  1220. }
  1221. else if(strcmp(long_options[option_index].name,"disable-filter")==0)
  1222. {
  1223. disable_replay_filter=1;
  1224. }
  1225. else if(strcmp(long_options[option_index].name,"fix-latency")==0)
  1226. {
  1227. fix_latency=1;
  1228. }
  1229. else if(strcmp(long_options[option_index].name,"log-position")==0)
  1230. {
  1231. enable_log_position=1;
  1232. }
  1233. else if(strcmp(long_options[option_index].name,"random-drop")==0)
  1234. {
  1235. sscanf(optarg,"%d",&random_drop);
  1236. if(random_drop<0||random_drop>10000)
  1237. {
  1238. mylog(log_fatal,"random_drop must be between 0 10000 \n");
  1239. myexit(-1);
  1240. }
  1241. }
  1242. else if(strcmp(long_options[option_index].name,"report")==0)
  1243. {
  1244. sscanf(optarg,"%d",&report_interval);
  1245. if(report_interval<=0)
  1246. {
  1247. mylog(log_fatal,"report_interval must be >0 \n");
  1248. myexit(-1);
  1249. }
  1250. }
  1251. else if(strcmp(long_options[option_index].name,"sock-buf")==0)
  1252. {
  1253. int tmp=-1;
  1254. sscanf(optarg,"%d",&tmp);
  1255. if(10<=tmp&&tmp<=10*1024)
  1256. {
  1257. socket_buf_size=tmp*1024;
  1258. }
  1259. else
  1260. {
  1261. mylog(log_fatal,"sock-buf value must be between 1 and 10240 (kbyte) \n");
  1262. myexit(-1);
  1263. }
  1264. }
  1265. else
  1266. {
  1267. mylog(log_fatal,"unknown option\n");
  1268. myexit(-1);
  1269. }
  1270. break;
  1271. default:
  1272. mylog(log_fatal,"unknown option <%x>", opt);
  1273. myexit(-1);
  1274. }
  1275. }
  1276. if (no_l)
  1277. mylog(log_fatal,"error: -i not found\n");
  1278. if (no_r)
  1279. mylog(log_fatal,"error: -o not found\n");
  1280. if (no_l || no_r)
  1281. myexit(-1);
  1282. if (is_client == 0 && is_server == 0)
  1283. {
  1284. mylog(log_fatal,"-s -c hasnt been set\n");
  1285. myexit(-1);
  1286. }
  1287. if (is_client == 1 && is_server == 1)
  1288. {
  1289. mylog(log_fatal,"-s -c cant be both set\n");
  1290. myexit(-1);
  1291. }
  1292. if(is_client==1)
  1293. {
  1294. program_mode=client_mode;
  1295. }
  1296. else
  1297. {
  1298. program_mode=server_mode;
  1299. }
  1300. }
  1301. int main(int argc, char *argv[])
  1302. {
  1303. /*
  1304. if(argc==1||argc==0)
  1305. {
  1306. printf("this_program classic\n");
  1307. printf("this_program fec\n");
  1308. return 0;
  1309. }*/
  1310. /*
  1311. if(argc>=2&&strcmp(argv[1],"fec")!=0)
  1312. {
  1313. printf("running into classic mode!\n");
  1314. return classic::main(argc,argv);
  1315. }*/
  1316. assert(sizeof(u64_t)==8);
  1317. assert(sizeof(i64_t)==8);
  1318. assert(sizeof(u32_t)==4);
  1319. assert(sizeof(i32_t)==4);
  1320. assert(sizeof(u16_t)==2);
  1321. assert(sizeof(i16_t)==2);
  1322. dup2(1, 2); //redirect stderr to stdout
  1323. int i, j, k;
  1324. process_arg(argc,argv);
  1325. delay_manager.set_capacity(max_pending_packet);
  1326. local_ip_uint32=inet_addr(local_ip);
  1327. remote_ip_uint32=inet_addr(remote_ip);
  1328. if(program_mode==client_mode)
  1329. {
  1330. client_event_loop();
  1331. }
  1332. else
  1333. {
  1334. server_event_loop();
  1335. }
  1336. return 0;
  1337. }