main.cpp 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351
  1. #include "common.h"
  2. #include "log.h"
  3. #include "git_version.h"
  4. #include "lib/rs.h"
  5. #include "packet.h"
  6. #include "connection.h"
  7. #include "fd_manager.h"
  8. #include "delay_manager.h"
  9. #include "fec_manager.h"
  10. using namespace std;
  11. typedef unsigned long long u64_t; //this works on most platform,avoid using the PRId64
  12. typedef long long i64_t;
  13. typedef unsigned int u32_t;
  14. typedef int i32_t;
  15. int dup_num=1;
  16. int dup_delay_min=20; //0.1ms
  17. int dup_delay_max=20;
  18. int jitter_min=0;
  19. int jitter_max=0;
  20. //int random_number_fd=-1;
  21. int mtu_warn=1350;
  22. int disable_mtu_warn=0;
  23. int disable_fec=1;
  24. int fec_data_num=20;
  25. int fec_redundant_num=10;
  26. int fec_mtu=1000;
  27. int fec_pending_num=30;
  28. int fec_pending_time=10000;
  29. int fec_type=0;
  30. u32_t local_ip_uint32,remote_ip_uint32=0;
  31. char local_ip[100], remote_ip[100];
  32. int local_port = -1, remote_port = -1;
  33. u64_t last_report_time=0;
  34. int report_interval=0;
  35. //conn_manager_t conn_manager;
  36. delay_manager_t delay_manager;
  37. fd_manager_t fd_manager;
  38. const int disable_conv_clear=0;
  39. int socket_buf_size=1024*1024;
  40. int VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV;
  41. int init_listen_socket()
  42. {
  43. local_listen_fd =socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  44. int yes = 1;
  45. //setsockopt(udp_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
  46. struct sockaddr_in local_me={0};
  47. socklen_t slen = sizeof(sockaddr_in);
  48. //memset(&local_me, 0, sizeof(local_me));
  49. local_me.sin_family = AF_INET;
  50. local_me.sin_port = htons(local_port);
  51. local_me.sin_addr.s_addr = local_ip_uint32;
  52. if (bind(local_listen_fd, (struct sockaddr*) &local_me, slen) == -1) {
  53. mylog(log_fatal,"socket bind error\n");
  54. //perror("socket bind error");
  55. myexit(1);
  56. }
  57. setnonblocking(local_listen_fd);
  58. set_buf_size(local_listen_fd,socket_buf_size);
  59. mylog(log_debug,"local_listen_fd=%d\n,",local_listen_fd);
  60. return 0;
  61. }
  62. int new_connected_socket(int &fd,u32_t ip,int port)
  63. {
  64. char ip_port[40];
  65. sprintf(ip_port,"%s:%d",my_ntoa(ip),port);
  66. struct sockaddr_in remote_addr_in = { 0 };
  67. socklen_t slen = sizeof(sockaddr_in);
  68. //memset(&remote_addr_in, 0, sizeof(remote_addr_in));
  69. remote_addr_in.sin_family = AF_INET;
  70. remote_addr_in.sin_port = htons(port);
  71. remote_addr_in.sin_addr.s_addr = ip;
  72. fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  73. if (fd < 0) {
  74. mylog(log_warn, "[%s]create udp_fd error\n", ip_port);
  75. return -1;
  76. }
  77. setnonblocking(fd);
  78. set_buf_size(fd, socket_buf_size);
  79. mylog(log_debug, "[%s]created new udp_fd %d\n", ip_port, fd);
  80. int ret = connect(fd, (struct sockaddr *) &remote_addr_in, slen);
  81. if (ret != 0) {
  82. mylog(log_warn, "[%s]fd connect fail\n",ip_port);
  83. close(fd);
  84. return -1;
  85. }
  86. return 0;
  87. }
  88. int delay_send(my_time_t delay,const dest_t &dest,char *data,int len)
  89. {
  90. int rand=random()%100;
  91. //mylog(log_info,"rand = %d\n",rand);
  92. if (dest.cook&&random_drop != 0) {
  93. if (get_true_random_number() % 10000 < (u32_t) random_drop) {
  94. return 0;
  95. }
  96. }
  97. return delay_manager.add(delay,dest,data,len);;
  98. }
  99. int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,int *&out_delay)
  100. {
  101. static int out_delay_buf[max_fec_packet_num+100]={0};
  102. //static int out_len_buf[max_fec_packet_num+100]={0};
  103. static int counter=0;
  104. out_delay=out_delay_buf;
  105. //out_len=out_len_buf;
  106. if(disable_fec)
  107. {
  108. if(data==0) return 0;
  109. out_n=1;
  110. static char *data_static;
  111. data_static=data;
  112. static int len_static;
  113. len_static=len;
  114. out_arr=&data_static;
  115. out_len=&len_static;
  116. }
  117. else
  118. {
  119. counter++;
  120. conn_info.fec_encode_manager.input(data,len);
  121. //if(counter%5==0)
  122. //conn_info.fec_encode_manager.input(0,0);
  123. //int n;
  124. //char **s_arr;
  125. //int s_len;
  126. conn_info.fec_encode_manager.output(out_n,out_arr,out_len);
  127. for(int i=0;i<out_n;i++)
  128. {
  129. out_delay_buf[i]=0;
  130. }
  131. }
  132. mylog(log_debug,"from_normal_to_fec input_len=%d,output_n=%d\n",len,out_n);
  133. //for(int i=0;i<n;i++)
  134. //{
  135. //delay_send(0,dest,s_arr[i],s_len);
  136. //}
  137. //delay_send(0,dest,data,len);
  138. //delay_send(1000*1000,dest,data,len);
  139. return 0;
  140. }
  141. int from_fec_to_normal(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,int *&out_delay)
  142. {
  143. static int out_delay_buf[max_fec_pending_packet_num+100]={0};
  144. out_delay=out_delay_buf;
  145. if(disable_fec)
  146. {
  147. if(data==0) return 0;
  148. out_n=1;
  149. static char *data_static;
  150. data_static=data;
  151. static int len_static;
  152. len_static=len;
  153. out_arr=&data_static;
  154. out_len=&len_static;
  155. //out_len_buf[0]=len;
  156. }
  157. else
  158. {
  159. conn_info.fec_decode_manager.input(data,len);
  160. //int n;char ** s_arr;int* len_arr;
  161. conn_info.fec_decode_manager.output(out_n,out_arr,out_len);
  162. for(int i=0;i<out_n;i++)
  163. {
  164. out_delay_buf[i]=0;
  165. }
  166. }
  167. mylog(log_debug,"from_fec_to_normal input_len=%d,output_n=%d\n",len,out_n);
  168. // printf("<n:%d>",n);
  169. /*
  170. for(int i=0;i<n;i++)
  171. {
  172. delay_send(0,dest,s_arr[i],len_arr[i]);
  173. //s_arr[i][len_arr[i]]=0;
  174. //printf("<%s>\n",s_arr[i]);
  175. }*/
  176. //my_send(dest,data,len);
  177. return 0;
  178. }
  179. int client_event_loop()
  180. {
  181. //char buf[buf_len];
  182. int i, j, k;int ret;
  183. int yes = 1;
  184. int epoll_fd;
  185. int remote_fd;
  186. fd64_t remote_fd64;
  187. conn_info_t *conn_info_p=new conn_info_t;
  188. conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack
  189. //conn_info.conv_manager.reserve();
  190. conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type);
  191. init_listen_socket();
  192. epoll_fd = epoll_create1(0);
  193. assert(epoll_fd>0);
  194. const int max_events = 4096;
  195. struct epoll_event ev, events[max_events];
  196. if (epoll_fd < 0) {
  197. mylog(log_fatal,"epoll return %d\n", epoll_fd);
  198. myexit(-1);
  199. }
  200. ev.events = EPOLLIN;
  201. ev.data.u64 = local_listen_fd;
  202. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev);
  203. if (ret!=0) {
  204. mylog(log_fatal,"add udp_listen_fd error\n");
  205. myexit(-1);
  206. }
  207. assert(new_connected_socket(remote_fd,remote_ip_uint32,remote_port)==0);
  208. remote_fd64=fd_manager.create(remote_fd);
  209. mylog(log_debug,"remote_fd64=%llu\n",remote_fd64);
  210. ev.events = EPOLLIN;
  211. ev.data.u64 = remote_fd64;
  212. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, remote_fd, &ev);
  213. if (ret!= 0) {
  214. mylog(log_fatal,"add raw_fd error\n");
  215. myexit(-1);
  216. }
  217. ev.events = EPOLLIN;
  218. ev.data.u64 = delay_manager.get_timer_fd();
  219. mylog(log_debug,"delay_manager.get_timer_fd()=%d\n",delay_manager.get_timer_fd());
  220. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
  221. if (ret!= 0) {
  222. mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
  223. myexit(-1);
  224. }
  225. u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64();
  226. ev.events = EPOLLIN;
  227. ev.data.u64 = fd64;
  228. mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64());
  229. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev);
  230. if (ret!= 0) {
  231. mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n");
  232. myexit(-1);
  233. }
  234. //my_timer_t timer;
  235. conn_info.timer.add_fd_to_epoll(epoll_fd);
  236. conn_info.timer.set_timer_repeat_us(timer_interval*1000);
  237. mylog(log_debug,"conn_info.timer.get_timer_fd()=%d\n",conn_info.timer.get_timer_fd());
  238. while(1)////////////////////////
  239. {
  240. if(about_to_exit) myexit(0);
  241. int nfds = epoll_wait(epoll_fd, events, max_events, 180 * 1000);
  242. if (nfds < 0) { //allow zero
  243. if(errno==EINTR )
  244. {
  245. mylog(log_info,"epoll interrupted by signal\n");
  246. myexit(0);
  247. }
  248. else
  249. {
  250. mylog(log_fatal,"epoll_wait return %d\n", nfds);
  251. myexit(-1);
  252. }
  253. }
  254. int idx;
  255. for (idx = 0; idx < nfds; ++idx) {
  256. if(events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd())
  257. {
  258. uint64_t value;
  259. read(conn_info.timer.get_timer_fd(), &value, 8);
  260. conn_info.conv_manager.clear_inactive();
  261. mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n");
  262. }
  263. else if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64())
  264. {
  265. char data[buf_len];
  266. int data_len;
  267. ip_port_t ip_port;
  268. u32_t conv;
  269. int out_n;char **out_arr;int *out_len;int *out_delay;
  270. dest_t dest;
  271. dest.type=type_fd64;
  272. dest.inner.fd64=remote_fd64;
  273. dest.cook=1;
  274. if(events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64())
  275. {
  276. mylog(log_trace,"events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()\n");
  277. //mylog(log_info,"timer!!!\n");
  278. uint64_t value;
  279. if(!fd_manager.exist(fd64)) //fd64 has been closed
  280. {
  281. mylog(log_trace,"!fd_manager.exist(fd64)");
  282. continue;
  283. }
  284. if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8)
  285. {
  286. mylog(log_trace,"(ret=read(fd_manager.to_fd(fd64), &value, 8))!=8,ret=%d\n",ret);
  287. continue;
  288. }
  289. if(value==0)
  290. {
  291. mylog(log_debug,"value==0\n");
  292. continue;
  293. }
  294. assert(value==1);
  295. from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
  296. //from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
  297. }
  298. else//events[idx].data.u64 == (u64_t)local_listen_fd
  299. {
  300. mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n");
  301. struct sockaddr_in udp_new_addr_in={0};
  302. socklen_t udp_new_addr_len = sizeof(sockaddr_in);
  303. if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0,
  304. (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) {
  305. mylog(log_error,"recv_from error,this shouldnt happen at client\n");
  306. myexit(1);
  307. };
  308. if(!disable_mtu_warn&&data_len>=mtu_warn)
  309. {
  310. mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  311. }
  312. mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
  313. ntohs(udp_new_addr_in.sin_port),data_len);
  314. ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
  315. ip_port.port=ntohs(udp_new_addr_in.sin_port);
  316. u64_t u64=ip_port.to_u64();
  317. if(!conn_info.conv_manager.is_u64_used(u64))
  318. {
  319. if(conn_info.conv_manager.get_size() >=max_conv_num)
  320. {
  321. mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n");
  322. continue;
  323. }
  324. conv=conn_info.conv_manager.get_new_conv();
  325. conn_info.conv_manager.insert_conv(conv,u64);
  326. mylog(log_info,"new packet from %s:%d,conv_id=%x\n",inet_ntoa(udp_new_addr_in.sin_addr),ntohs(udp_new_addr_in.sin_port),conv);
  327. }
  328. else
  329. {
  330. conv=conn_info.conv_manager.find_conv_by_u64(u64);
  331. mylog(log_trace,"conv=%d\n",conv);
  332. }
  333. conn_info.conv_manager.update_active_time(conv);
  334. char * new_data;
  335. int new_len;
  336. put_conv(conv,data,data_len,new_data,new_len);
  337. //dest.conv=conv;
  338. from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay);
  339. }
  340. mylog(log_trace,"out_n=%d\n",out_n);
  341. for(int i=0;i<out_n;i++)
  342. {
  343. delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
  344. }
  345. //my_send(dest,data,data_len);
  346. }
  347. else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) {
  348. uint64_t value;
  349. read(delay_manager.get_timer_fd(), &value, 8);
  350. mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n");
  351. //printf("<timerfd_triggered, %d>",delay_mp.size());
  352. //fflush(stdout);
  353. }
  354. else if(events[idx].data.u64>u32_t(-1) )
  355. {
  356. char data[buf_len];
  357. if(!fd_manager.exist(events[idx].data.u64)) //fd64 has been closed
  358. {
  359. mylog(log_trace,"!fd_manager.exist(events[idx].data.u64)");
  360. continue;
  361. }
  362. assert(events[idx].data.u64==remote_fd64);
  363. int fd=fd_manager.to_fd(remote_fd64);
  364. int data_len =recv(fd,data,max_data_len,0);
  365. mylog(log_trace, "received data from udp fd %d, len=%d\n", remote_fd,data_len);
  366. if(data_len<0)
  367. {
  368. if(errno==ECONNREFUSED)
  369. {
  370. //conn_manager.clear_list.push_back(udp_fd);
  371. mylog(log_debug, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
  372. }
  373. mylog(log_warn, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
  374. continue;
  375. }
  376. if(!disable_mtu_warn&&data_len>mtu_warn)
  377. {
  378. mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  379. }
  380. if(de_cook(data,data_len)!=0)
  381. {
  382. mylog(log_debug,"de_cook error");
  383. continue;
  384. }
  385. int out_n;char **out_arr;int *out_len;int *out_delay;
  386. from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay);
  387. mylog(log_trace,"out_n=%d\n",out_n);
  388. for(int i=0;i<out_n;i++)
  389. {
  390. u32_t conv;
  391. char *new_data;
  392. int new_len;
  393. if(get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0)
  394. {
  395. mylog(log_debug,"get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0");
  396. continue;
  397. }
  398. if(!conn_info.conv_manager.is_conv_used(conv))
  399. {
  400. mylog(log_trace,"!conn_info.conv_manager.is_conv_used(conv)");
  401. }
  402. conn_info.conv_manager.update_active_time(conv);
  403. u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv);
  404. dest_t dest;
  405. dest.inner.ip_port.from_u64(u64);
  406. dest.type=type_ip_port;
  407. //dest.conv=conv;
  408. delay_send(out_delay[i],dest,new_data,new_len);
  409. }
  410. //mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s());
  411. }
  412. else
  413. {
  414. mylog(log_fatal,"unknown fd,this should never happen\n");
  415. myexit(-1);
  416. }
  417. }
  418. delay_manager.check();
  419. }
  420. return 0;
  421. }
  422. int server_event_loop()
  423. {
  424. //char buf[buf_len];
  425. int i, j, k;int ret;
  426. int yes = 1;
  427. int epoll_fd;
  428. int remote_fd;
  429. // conn_info_t conn_info;
  430. init_listen_socket();
  431. epoll_fd = epoll_create1(0);
  432. assert(epoll_fd>0);
  433. const int max_events = 4096;
  434. struct epoll_event ev, events[max_events];
  435. if (epoll_fd < 0) {
  436. mylog(log_fatal,"epoll return %d\n", epoll_fd);
  437. myexit(-1);
  438. }
  439. ev.events = EPOLLIN;
  440. ev.data.u64 = local_listen_fd;
  441. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev);
  442. if (ret!=0) {
  443. mylog(log_fatal,"add udp_listen_fd error\n");
  444. myexit(-1);
  445. }
  446. ev.events = EPOLLIN;
  447. ev.data.u64 = delay_manager.get_timer_fd();
  448. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
  449. if (ret!= 0) {
  450. mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
  451. myexit(-1);
  452. }
  453. mylog(log_debug," delay_manager.get_timer_fd() =%d\n", delay_manager.get_timer_fd());
  454. mylog(log_info,"now listening at %s:%d\n",my_ntoa(local_ip_uint32),local_port);
  455. my_timer_t timer;
  456. timer.add_fd_to_epoll(epoll_fd);
  457. timer.set_timer_repeat_us(timer_interval*1000);
  458. mylog(log_debug," timer.get_timer_fd() =%d\n",timer.get_timer_fd());
  459. while(1)////////////////////////
  460. {
  461. if(about_to_exit) myexit(0);
  462. int nfds = epoll_wait(epoll_fd, events, max_events, 180 * 1000);
  463. if (nfds < 0) { //allow zero
  464. if(errno==EINTR )
  465. {
  466. mylog(log_info,"epoll interrupted by signal\n");
  467. myexit(0);
  468. }
  469. else
  470. {
  471. mylog(log_fatal,"epoll_wait return %d\n", nfds);
  472. myexit(-1);
  473. }
  474. }
  475. int idx;
  476. for (idx = 0; idx < nfds; ++idx)
  477. {
  478. /*
  479. if ((events[idx].data.u64 ) == (u64_t)timer_fd)
  480. {
  481. conn_manager.clear_inactive();
  482. u64_t dummy;
  483. read(timer_fd, &dummy, 8);
  484. //current_time_rough=get_current_time();
  485. }
  486. else */
  487. if(events[idx].data.u64==(u64_t)timer.get_timer_fd())
  488. {
  489. uint64_t value;
  490. read(timer.get_timer_fd(), &value, 8);
  491. conn_manager.clear_inactive();
  492. mylog(log_trace,"events[idx].data.u64==(u64_t)timer.get_timer_fd()\n");
  493. //conn_info.conv_manager.clear_inactive();
  494. }
  495. else if (events[idx].data.u64 == (u64_t)local_listen_fd)
  496. {
  497. mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n");
  498. //int recv_len;
  499. char data[buf_len];
  500. int data_len;
  501. struct sockaddr_in udp_new_addr_in={0};
  502. socklen_t udp_new_addr_len = sizeof(sockaddr_in);
  503. if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0,
  504. (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) {
  505. mylog(log_error,"recv_from error,this shouldnt happen at client\n");
  506. myexit(1);
  507. };
  508. mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
  509. ntohs(udp_new_addr_in.sin_port),data_len);
  510. if(!disable_mtu_warn&&data_len>=mtu_warn)///////////////////////delete this for type 0 in furture
  511. {
  512. mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  513. }
  514. if(de_cook(data,data_len)!=0)
  515. {
  516. mylog(log_debug,"de_cook error");
  517. continue;
  518. }
  519. ip_port_t ip_port;
  520. ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
  521. ip_port.port=ntohs(udp_new_addr_in.sin_port);
  522. mylog(log_trace,"ip_port= %s\n",ip_port.to_s());
  523. if(!conn_manager.exist(ip_port))
  524. {
  525. if(conn_manager.mp.size() >=max_conn_num)
  526. {
  527. mylog(log_warn,"new connection %s ignored bc max_conn_num exceed\n",ip_port.to_s());
  528. continue;
  529. }
  530. conn_manager.insert(ip_port);
  531. conn_info_t &conn_info=conn_manager.find(ip_port);
  532. conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type);
  533. //conn_info.conv_manager.reserve(); //already reserved in constructor
  534. u64_t fec_fd64=conn_info.fec_encode_manager.get_timer_fd64();
  535. mylog(log_debug,"fec_fd64=%llu\n",fec_fd64);
  536. ev.events = EPOLLIN;
  537. ev.data.u64 = fec_fd64;
  538. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fec_fd64), &ev);
  539. fd_manager.get_info(fec_fd64).ip_port=ip_port;
  540. conn_info.timer.add_fd64_to_epoll(epoll_fd);
  541. conn_info.timer.set_timer_repeat_us(timer_interval*100);
  542. mylog(log_debug,"conn_info.timer.get_timer_fd64()=%llu\n",conn_info.timer.get_timer_fd64());
  543. u64_t timer_fd64=conn_info.timer.get_timer_fd64();
  544. fd_manager.get_info(timer_fd64).ip_port=ip_port;
  545. mylog(log_info,"new connection from %s\n",ip_port.to_s());
  546. }
  547. conn_info_t &conn_info=conn_manager.find(ip_port);
  548. conn_info.update_active_time();
  549. int out_n;char **out_arr;int *out_len;int *out_delay;
  550. from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay);
  551. mylog(log_trace,"out_n= %d\n",out_n);
  552. for(int i=0;i<out_n;i++)
  553. {
  554. u32_t conv;
  555. char *new_data;
  556. int new_len;
  557. if(get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0)
  558. {
  559. mylog(log_debug,"get_conv failed");
  560. continue;
  561. }
  562. /*
  563. id_t tmp_conv_id;
  564. memcpy(&tmp_conv_id,&data_[0],sizeof(tmp_conv_id));
  565. tmp_conv_id=ntohl(tmp_conv_id);*/
  566. if (!conn_info.conv_manager.is_conv_used(conv))
  567. {
  568. if(conn_info.conv_manager.get_size() >=max_conv_num)
  569. {
  570. mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n");
  571. continue;
  572. }
  573. int new_udp_fd;
  574. ret=new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port);
  575. if (ret != 0) {
  576. mylog(log_warn, "[%s]new_connected_socket failed\n",ip_port.to_s());
  577. continue;
  578. }
  579. fd64_t fd64 = fd_manager.create(new_udp_fd);
  580. ev.events = EPOLLIN;
  581. ev.data.u64 = fd64;
  582. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_udp_fd, &ev);
  583. conn_info.conv_manager.insert_conv(conv, fd64);
  584. fd_manager.get_info(fd64).ip_port=ip_port;
  585. mylog(log_info,"[%s]new conv %x,fd %d created,fd64=%llu\n",ip_port.to_s(),conv,new_udp_fd,fd64);
  586. //assert(!conn_manager.exist_fd64(fd64));
  587. //conn_manager.insert_fd64(fd64,ip_port);
  588. }
  589. conn_info.conv_manager.update_active_time(conv);
  590. fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv);
  591. //int fd=fd_manager.fd64_to_fd(fd64);
  592. dest_t dest;
  593. dest.type=type_fd64;
  594. dest.inner.fd64=fd64;
  595. //dest.conv=conv;
  596. delay_send(out_delay[i],dest,new_data,new_len);
  597. }
  598. }
  599. else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) {
  600. uint64_t value;
  601. read(delay_manager.get_timer_fd(), &value, 8);
  602. mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n");
  603. }
  604. else if (events[idx].data.u64 >u32_t(-1))
  605. {
  606. char data[buf_len];
  607. int data_len;
  608. u32_t conv;
  609. fd64_t fd64=events[idx].data.u64;
  610. mylog(log_trace,"events[idx].data.u64 >u32_t(-1),%llu\n",(u64_t)events[idx].data.u64);
  611. if(!fd_manager.exist(fd64)) //fd64 has been closed
  612. {
  613. mylog(log_trace,"!fd_manager.exist(fd64)\n");
  614. continue;
  615. }
  616. assert(fd_manager.exist_info(fd64));
  617. ip_port_t ip_port=fd_manager.get_info(fd64).ip_port;
  618. assert(conn_manager.exist(ip_port));
  619. conn_info_t &conn_info=conn_manager.find(ip_port);
  620. //conn_info.update_active_time(); //cant put it here
  621. int out_n=-2;char **out_arr;int *out_len;int *out_delay;
  622. dest_t dest;
  623. dest.type=type_ip_port;
  624. //dest.conv=conv;
  625. dest.inner.ip_port=ip_port;
  626. dest.cook=1;
  627. if(fd64==conn_info.fec_encode_manager.get_timer_fd64())
  628. {
  629. //mylog(log_infol,"timer!!!\n");
  630. uint64_t value;
  631. if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8)
  632. {
  633. mylog(log_trace,"fd_manager.to_fd(fd64), &value, 8)!=8 ,%d\n",ret);
  634. continue;
  635. }
  636. if(value==0)
  637. {
  638. mylog(log_trace,"value==0\n");
  639. continue;
  640. }
  641. assert(value==1);
  642. from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
  643. }
  644. else if(fd64==conn_info.timer.get_timer_fd64())
  645. {
  646. uint64_t value;
  647. read(conn_info.timer.get_timer_fd(), &value, 8);
  648. conn_info.conv_manager.clear_inactive();
  649. continue;
  650. }
  651. else
  652. {
  653. assert(conn_info.conv_manager.is_u64_used(fd64));
  654. conv=conn_info.conv_manager.find_conv_by_u64(fd64);
  655. conn_info.conv_manager.update_active_time(conv);
  656. conn_info.update_active_time();
  657. int fd=fd_manager.to_fd(fd64);
  658. data_len=recv(fd,data,max_data_len,0);
  659. mylog(log_trace,"received a packet from udp_fd,len:%d,conv=%d\n",data_len,conv);
  660. if(data_len<0)
  661. {
  662. mylog(log_debug,"udp fd,recv_len<0 continue,%s\n",strerror(errno));
  663. continue;
  664. }
  665. if(!disable_mtu_warn&&data_len>=mtu_warn)
  666. {
  667. mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  668. }
  669. char * new_data;
  670. int new_len;
  671. put_conv(conv,data,data_len,new_data,new_len);
  672. from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay);
  673. }
  674. mylog(log_trace,"out_n=%d\n",out_n);
  675. for(int i=0;i<out_n;i++)
  676. {
  677. delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
  678. }
  679. //mylog(log_trace,"[%s] send packet\n",ip_port.to_s());
  680. }
  681. else
  682. {
  683. mylog(log_fatal,"unknown fd,this should never happen\n");
  684. myexit(-1);
  685. }
  686. }
  687. delay_manager.check();
  688. }
  689. return 0;
  690. }
  691. int unit_test()
  692. {
  693. int i,j,k;
  694. void *code=fec_new(3,6);
  695. char arr[6][100]=
  696. {
  697. "aaa","bbb","ccc"
  698. ,"ddd","eee","fff"
  699. };
  700. char *data[6];
  701. for(i=0;i<6;i++)
  702. {
  703. data[i]=arr[i];
  704. }
  705. rs_encode2(3,6,data,3);
  706. //printf("%d %d",(int)(unsigned char)arr[5][0],(int)('a'^'b'^'c'^'d'^'e'));
  707. for(i=0;i<6;i++)
  708. {
  709. printf("<%s>",data[i]);
  710. }
  711. data[0]=0;
  712. //data[1]=0;
  713. //data[5]=0;
  714. int ret=rs_decode2(3,6,data,3);
  715. printf("ret:%d\n",ret);
  716. for(i=0;i<6;i++)
  717. {
  718. printf("<%s>",data[i]);
  719. }
  720. fec_free(code);
  721. char arr2[6][100]=
  722. {
  723. "aaa11111","","ccc333333333"
  724. ,"ddd444","eee5555","ff6666"
  725. };
  726. blob_encode_t blob_encode;
  727. for(int i=0;i<6;i++)
  728. blob_encode.input(arr2[i],strlen(arr2[i]));
  729. char **output;
  730. int shard_len;
  731. blob_encode.output(7,output,shard_len);
  732. printf("<shard_len:%d>",shard_len);
  733. blob_decode_t blob_decode;
  734. for(int i=0;i<7;i++)
  735. {
  736. blob_decode.input(output[i],shard_len);
  737. }
  738. char **decode_output;
  739. int * len_arr;
  740. int num;
  741. ret=blob_decode.output(num,decode_output,len_arr);
  742. printf("<num:%d,ret:%d>\n",num,ret);
  743. for(int i=0;i<num;i++)
  744. {
  745. char buf[1000]={0};
  746. memcpy(buf,decode_output[i],len_arr[i]);
  747. printf("<%d:%s>",len_arr[i],buf);
  748. }
  749. printf("\n");
  750. static fec_encode_manager_t fec_encode_manager;
  751. static fec_decode_manager_t fec_decode_manager;
  752. {
  753. string a = "11111";
  754. string b = "22";
  755. string c = "33333333";
  756. fec_encode_manager.input((char *) a.c_str(), a.length());
  757. fec_encode_manager.input((char *) b.c_str(), b.length());
  758. fec_encode_manager.input((char *) c.c_str(), c.length());
  759. fec_encode_manager.input(0, 0);
  760. int n;
  761. char **s_arr;
  762. int *len;
  763. fec_encode_manager.output(n,s_arr,len);
  764. printf("<n:%d,len:%d>",n,len[0]);
  765. for(int i=0;i<n;i++)
  766. {
  767. fec_decode_manager.input(s_arr[i],len[i]);
  768. }
  769. {
  770. int n;char ** s_arr;int* len_arr;
  771. fec_decode_manager.output(n,s_arr,len_arr);
  772. printf("<n:%d>",n);
  773. for(int i=0;i<n;i++)
  774. {
  775. s_arr[i][len_arr[i]]=0;
  776. printf("<%s>\n",s_arr[i]);
  777. }
  778. }
  779. }
  780. {
  781. string a = "aaaaaaa";
  782. string b = "bbbbbbbbbbbbb";
  783. string c = "ccc";
  784. fec_encode_manager.input((char *) a.c_str(), a.length());
  785. fec_encode_manager.input((char *) b.c_str(), b.length());
  786. fec_encode_manager.input((char *) c.c_str(), c.length());
  787. fec_encode_manager.input(0, 0);
  788. int n;
  789. char **s_arr;
  790. int * len;
  791. fec_encode_manager.output(n,s_arr,len);
  792. printf("<n:%d,len:%d>",n,len[0]);
  793. for(int i=0;i<n;i++)
  794. {
  795. if(i==1||i==3||i==5||i==0)
  796. fec_decode_manager.input(s_arr[i],len[i]);
  797. }
  798. {
  799. int n;char ** s_arr;int* len_arr;
  800. fec_decode_manager.output(n,s_arr,len_arr);
  801. printf("<n:%d>",n);
  802. for(int i=0;i<n;i++)
  803. {
  804. s_arr[i][len_arr[i]]=0;
  805. printf("<%s>\n",s_arr[i]);
  806. }
  807. }
  808. }
  809. return 0;
  810. }
  811. void print_help()
  812. {
  813. char git_version_buf[100]={0};
  814. strncpy(git_version_buf,gitversion,10);
  815. printf("UDPspeeder\n");
  816. printf("git version:%s ",git_version_buf);
  817. printf("build date:%s %s\n",__DATE__,__TIME__);
  818. printf("repository: https://github.com/wangyu-/UDPspeeder\n");
  819. printf("\n");
  820. printf("usage:\n");
  821. printf(" run as client : ./this_program -c -l local_listen_ip:local_port -r server_ip:server_port [options]\n");
  822. printf(" run as server : ./this_program -s -l server_listen_ip:server_port -r remote_ip:remote_port [options]\n");
  823. printf("\n");
  824. printf("common option,must be same on both sides:\n");
  825. printf(" -k,--key <string> key for simple xor encryption,default:\"secret key\"\n");
  826. printf("main options:\n");
  827. printf(" -d <number> duplicated packet number, -d 0 means no duplicate. default value:0\n");
  828. printf(" -t <number> duplicated packet delay time, unit: 0.1ms,default value:20(2ms)\n");
  829. printf(" -j <number> simulated jitter.randomly delay first packet for 0~jitter_value*0.1 ms,to\n");
  830. printf(" create simulated jitter.default value:0.do not use if you dont\n");
  831. printf(" know what it means\n");
  832. printf(" --report <number> turn on udp send/recv report,and set a time interval for reporting,unit:s\n");
  833. printf("advanced options:\n");
  834. printf(" -t tmin:tmax simliar to -t above,but delay randomly between tmin and tmax\n");
  835. printf(" -j jmin:jmax simliar to -j above,but create jitter randomly between jmin and jmax\n");
  836. printf(" --random-drop <number> simulate packet loss ,unit:0.01%%\n");
  837. printf(" --disable-filter disable duplicate packet filter.\n");
  838. printf(" -m <number> max pending packets,to prevent the program from eating up all your memory,\n");
  839. printf(" default value:0(disabled).\n");
  840. printf("other options:\n");
  841. printf(" --log-level <number> 0:never 1:fatal 2:error 3:warn \n");
  842. printf(" 4:info (default) 5:debug 6:trace\n");
  843. printf(" --log-position enable file name,function name,line number in log\n");
  844. printf(" --disable-color disable log color\n");
  845. printf(" --sock-buf <number> buf size for socket,>=10 and <=10240,unit:kbyte,default:1024\n");
  846. //printf(" -p use multi-process mode instead of epoll.very costly,only for test,do dont use\n");
  847. printf(" -h,--help print this help message\n");
  848. //printf("common options,these options must be same on both side\n");
  849. }
  850. void process_arg(int argc, char *argv[])
  851. {
  852. int is_client=0,is_server=0;
  853. int i, j, k;
  854. int opt;
  855. static struct option long_options[] =
  856. {
  857. {"log-level", required_argument, 0, 1},
  858. {"log-position", no_argument, 0, 1},
  859. {"disable-color", no_argument, 0, 1},
  860. {"disable-filter", no_argument, 0, 1},
  861. {"sock-buf", required_argument, 0, 1},
  862. {"random-drop", required_argument, 0, 1},
  863. {"report", required_argument, 0, 1},
  864. {NULL, 0, 0, 0}
  865. };
  866. int option_index = 0;
  867. if (argc == 1)
  868. {
  869. print_help();
  870. myexit( -1);
  871. }
  872. for (i = 0; i < argc; i++)
  873. {
  874. if(strcmp(argv[i],"--unit-test")==0)
  875. {
  876. unit_test();
  877. myexit(0);
  878. }
  879. }
  880. for (i = 0; i < argc; i++)
  881. {
  882. if(strcmp(argv[i],"-h")==0||strcmp(argv[i],"--help")==0)
  883. {
  884. print_help();
  885. myexit(0);
  886. }
  887. }
  888. for (i = 0; i < argc; i++)
  889. {
  890. if(strcmp(argv[i],"--log-level")==0)
  891. {
  892. if(i<argc -1)
  893. {
  894. sscanf(argv[i+1],"%d",&log_level);
  895. if(0<=log_level&&log_level<log_end)
  896. {
  897. }
  898. else
  899. {
  900. log_bare(log_fatal,"invalid log_level\n");
  901. myexit(-1);
  902. }
  903. }
  904. }
  905. if(strcmp(argv[i],"--disable-color")==0)
  906. {
  907. enable_log_color=0;
  908. }
  909. }
  910. mylog(log_info,"argc=%d ", argc);
  911. for (i = 0; i < argc; i++) {
  912. log_bare(log_info, "%s ", argv[i]);
  913. }
  914. log_bare(log_info, "\n");
  915. if (argc == 1)
  916. {
  917. print_help();
  918. myexit(-1);
  919. }
  920. int no_l = 1, no_r = 1;
  921. while ((opt = getopt_long(argc, argv, "l:r:d:t:hcspk:j:m:",long_options,&option_index)) != -1)
  922. {
  923. //string opt_key;
  924. //opt_key+=opt;
  925. switch (opt)
  926. {
  927. case 'p':
  928. //multi_process_mode=1;
  929. break;
  930. case 'k':
  931. sscanf(optarg,"%s\n",key_string);
  932. mylog(log_debug,"key=%s\n",key_string);
  933. if(strlen(key_string)==0)
  934. {
  935. mylog(log_fatal,"key len=0??\n");
  936. myexit(-1);
  937. }
  938. break;
  939. case 'm':
  940. sscanf(optarg,"%d\n",&max_pending_packet);
  941. if(max_pending_packet<1000)
  942. {
  943. mylog(log_fatal,"max_pending_packet must be >1000\n");
  944. myexit(-1);
  945. }
  946. break;
  947. case 'j':
  948. if (strchr(optarg, ':') == 0)
  949. {
  950. int jitter;
  951. sscanf(optarg,"%d\n",&jitter);
  952. if(jitter<0 ||jitter>1000*100)
  953. {
  954. mylog(log_fatal,"jitter must be between 0 and 100,000(10 second)\n");
  955. myexit(-1);
  956. }
  957. jitter_min=0;
  958. jitter_max=jitter;
  959. }
  960. else
  961. {
  962. sscanf(optarg,"%d:%d\n",&jitter_min,&jitter_max);
  963. if(jitter_min<0 ||jitter_max<0||jitter_min>jitter_max)
  964. {
  965. mylog(log_fatal," must satisfy 0<=jmin<=jmax\n");
  966. myexit(-1);
  967. }
  968. }
  969. break;
  970. case 't':
  971. sscanf(optarg,"%d",&fec_type);
  972. /*
  973. if (strchr(optarg, ':') == 0)
  974. {
  975. int dup_delay=-1;
  976. sscanf(optarg,"%d\n",&dup_delay);
  977. if(dup_delay<1||dup_delay>1000*100)
  978. {
  979. mylog(log_fatal,"dup_delay must be between 1 and 100,000(10 second)\n");
  980. myexit(-1);
  981. }
  982. dup_delay_min=dup_delay_max=dup_delay;
  983. }
  984. else
  985. {
  986. sscanf(optarg,"%d:%d\n",&dup_delay_min,&dup_delay_max);
  987. if(dup_delay_min<1 ||dup_delay_max<1||dup_delay_min>dup_delay_max)
  988. {
  989. mylog(log_fatal," must satisfy 1<=dmin<=dmax\n");
  990. myexit(-1);
  991. }
  992. }*/
  993. break;
  994. case 'd':
  995. dup_num=-1;
  996. sscanf(optarg,"%d\n",&dup_num);
  997. if(dup_num<0 ||dup_num>5)
  998. {
  999. mylog(log_fatal,"dup_num must be between 0 and 5\n");
  1000. myexit(-1);
  1001. }
  1002. dup_num+=1;
  1003. break;
  1004. case 'c':
  1005. is_client = 1;
  1006. break;
  1007. case 's':
  1008. is_server = 1;
  1009. break;
  1010. case 'l':
  1011. no_l = 0;
  1012. if (strchr(optarg, ':') != 0)
  1013. {
  1014. sscanf(optarg, "%[^:]:%d", local_ip, &local_port);
  1015. }
  1016. else
  1017. {
  1018. mylog(log_fatal," -r ip:port\n");
  1019. myexit(1);
  1020. strcpy(local_ip, "127.0.0.1");
  1021. sscanf(optarg, "%d", &local_port);
  1022. }
  1023. break;
  1024. case 'r':
  1025. no_r = 0;
  1026. if (strchr(optarg, ':') != 0)
  1027. {
  1028. //printf("in :\n");
  1029. //printf("%s\n",optarg);
  1030. sscanf(optarg, "%[^:]:%d", remote_ip, &remote_port);
  1031. //printf("%d\n",remote_port);
  1032. }
  1033. else
  1034. {
  1035. mylog(log_fatal," -r ip:port\n");
  1036. myexit(1);
  1037. strcpy(remote_ip, "127.0.0.1");
  1038. sscanf(optarg, "%d", &remote_port);
  1039. }
  1040. break;
  1041. case 'h':
  1042. break;
  1043. case 1:
  1044. if(strcmp(long_options[option_index].name,"log-level")==0)
  1045. {
  1046. }
  1047. else if(strcmp(long_options[option_index].name,"disable-filter")==0)
  1048. {
  1049. disable_replay_filter=1;
  1050. //enable_log_color=0;
  1051. }
  1052. else if(strcmp(long_options[option_index].name,"disable-color")==0)
  1053. {
  1054. //enable_log_color=0;
  1055. }
  1056. else if(strcmp(long_options[option_index].name,"log-position")==0)
  1057. {
  1058. enable_log_position=1;
  1059. }
  1060. else if(strcmp(long_options[option_index].name,"random-drop")==0)
  1061. {
  1062. sscanf(optarg,"%d",&random_drop);
  1063. if(random_drop<0||random_drop>10000)
  1064. {
  1065. mylog(log_fatal,"random_drop must be between 0 10000 \n");
  1066. myexit(-1);
  1067. }
  1068. }
  1069. else if(strcmp(long_options[option_index].name,"report")==0)
  1070. {
  1071. sscanf(optarg,"%d",&report_interval);
  1072. if(report_interval<=0)
  1073. {
  1074. mylog(log_fatal,"report_interval must be >0 \n");
  1075. myexit(-1);
  1076. }
  1077. }
  1078. else if(strcmp(long_options[option_index].name,"sock-buf")==0)
  1079. {
  1080. int tmp=-1;
  1081. sscanf(optarg,"%d",&tmp);
  1082. if(10<=tmp&&tmp<=10*1024)
  1083. {
  1084. socket_buf_size=tmp*1024;
  1085. }
  1086. else
  1087. {
  1088. mylog(log_fatal,"sock-buf value must be between 1 and 10240 (kbyte) \n");
  1089. myexit(-1);
  1090. }
  1091. }
  1092. else
  1093. {
  1094. mylog(log_fatal,"unknown option\n");
  1095. myexit(-1);
  1096. }
  1097. break;
  1098. default:
  1099. mylog(log_fatal,"unknown option <%x>", opt);
  1100. myexit(-1);
  1101. }
  1102. }
  1103. if (no_l)
  1104. mylog(log_fatal,"error: -i not found\n");
  1105. if (no_r)
  1106. mylog(log_fatal,"error: -o not found\n");
  1107. if (no_l || no_r)
  1108. myexit(-1);
  1109. if (is_client == 0 && is_server == 0)
  1110. {
  1111. mylog(log_fatal,"-s -c hasnt been set\n");
  1112. myexit(-1);
  1113. }
  1114. if (is_client == 1 && is_server == 1)
  1115. {
  1116. mylog(log_fatal,"-s -c cant be both set\n");
  1117. myexit(-1);
  1118. }
  1119. if(is_client==1)
  1120. {
  1121. program_mode=client_mode;
  1122. }
  1123. else
  1124. {
  1125. program_mode=server_mode;
  1126. }
  1127. }
  1128. int main(int argc, char *argv[])
  1129. {
  1130. /*
  1131. if(argc==1||argc==0)
  1132. {
  1133. printf("this_program classic\n");
  1134. printf("this_program fec\n");
  1135. return 0;
  1136. }*/
  1137. /*
  1138. if(argc>=2&&strcmp(argv[1],"fec")!=0)
  1139. {
  1140. printf("running into classic mode!\n");
  1141. return classic::main(argc,argv);
  1142. }*/
  1143. assert(sizeof(u64_t)==8);
  1144. assert(sizeof(i64_t)==8);
  1145. assert(sizeof(u32_t)==4);
  1146. assert(sizeof(i32_t)==4);
  1147. assert(sizeof(u16_t)==2);
  1148. assert(sizeof(i16_t)==2);
  1149. dup2(1, 2); //redirect stderr to stdout
  1150. int i, j, k;
  1151. process_arg(argc,argv);
  1152. delay_manager.capacity=max_pending_packet;
  1153. local_ip_uint32=inet_addr(local_ip);
  1154. remote_ip_uint32=inet_addr(remote_ip);
  1155. fd_manager.reserve(10007);
  1156. if(program_mode==client_mode)
  1157. {
  1158. client_event_loop();
  1159. }
  1160. else
  1161. {
  1162. server_event_loop();
  1163. }
  1164. return 0;
  1165. }