main.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996
  1. #include "common.h"
  2. #include "log.h"
  3. #include "git_version.h"
  4. #include "lib/rs.h"
  5. #include "packet.h"
  6. #include "connection.h"
  7. #include "fd_manager.h"
  8. #include "delay_manager.h"
  9. #include "fec_manager.h"
  10. using namespace std;
  11. typedef unsigned long long u64_t; //this works on most platform,avoid using the PRId64
  12. typedef long long i64_t;
  13. typedef unsigned int u32_t;
  14. typedef int i32_t;
  15. int dup_num=1;
  16. int dup_delay_min=20; //0.1ms
  17. int dup_delay_max=20;
  18. int jitter_min=0;
  19. int jitter_max=0;
  20. //int random_number_fd=-1;
  21. int mtu_warn=1350;
  22. u32_t local_ip_uint32,remote_ip_uint32=0;
  23. char local_ip[100], remote_ip[100];
  24. int local_port = -1, remote_port = -1;
  25. u64_t last_report_time=0;
  26. int report_interval=0;
  27. //conn_manager_t conn_manager;
  28. delay_manager_t delay_manager;
  29. fd_manager_t fd_manager;
  30. const int disable_conv_clear=0;
  31. int socket_buf_size=1024*1024;
  32. int VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV;
  33. int init_listen_socket()
  34. {
  35. local_listen_fd =socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  36. int yes = 1;
  37. //setsockopt(udp_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
  38. struct sockaddr_in local_me={0};
  39. socklen_t slen = sizeof(sockaddr_in);
  40. //memset(&local_me, 0, sizeof(local_me));
  41. local_me.sin_family = AF_INET;
  42. local_me.sin_port = htons(local_port);
  43. local_me.sin_addr.s_addr = local_ip_uint32;
  44. if (bind(local_listen_fd, (struct sockaddr*) &local_me, slen) == -1) {
  45. mylog(log_fatal,"socket bind error\n");
  46. //perror("socket bind error");
  47. myexit(1);
  48. }
  49. setnonblocking(local_listen_fd);
  50. set_buf_size(local_listen_fd,socket_buf_size);
  51. return 0;
  52. }
  53. int new_connected_socket(int &fd,u32_t ip,int port)
  54. {
  55. char ip_port[40];
  56. sprintf(ip_port,"%s:%d",my_ntoa(ip),port);
  57. struct sockaddr_in remote_addr_in = { 0 };
  58. socklen_t slen = sizeof(sockaddr_in);
  59. //memset(&remote_addr_in, 0, sizeof(remote_addr_in));
  60. remote_addr_in.sin_family = AF_INET;
  61. remote_addr_in.sin_port = htons(port);
  62. remote_addr_in.sin_addr.s_addr = ip;
  63. fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  64. if (fd < 0) {
  65. mylog(log_warn, "[%s]create udp_fd error\n", ip_port);
  66. return -1;
  67. }
  68. setnonblocking(fd);
  69. set_buf_size(fd, socket_buf_size);
  70. mylog(log_debug, "[%s]created new udp_fd %d\n", ip_port, fd);
  71. int ret = connect(fd, (struct sockaddr *) &remote_addr_in, slen);
  72. if (ret != 0) {
  73. mylog(log_warn, "[%s]fd connect fail\n",ip_port);
  74. close(fd);
  75. return -1;
  76. }
  77. return 0;
  78. }
  79. int delay_send(my_time_t delay,const dest_t &dest,char *data,int len)
  80. {
  81. return delay_manager.add(delay,dest,data,len);;
  82. }
  83. int from_normal_to_fec(const dest_t &dest,char *data,int len)
  84. {
  85. delay_send(0,dest,data,len);
  86. delay_send(1000*1000,dest,data,len);
  87. return 0;
  88. }
  89. int from_fec_to_normal(const dest_t &dest,char *data,int len)
  90. {
  91. my_send(dest,data,len);
  92. return 0;
  93. }
  94. int client_event_loop()
  95. {
  96. //char buf[buf_len];
  97. int i, j, k;int ret;
  98. int yes = 1;
  99. int epoll_fd;
  100. int remote_fd;
  101. fd64_t remote_fd64;
  102. conn_info_t conn_info;
  103. init_listen_socket();
  104. epoll_fd = epoll_create1(0);
  105. const int max_events = 4096;
  106. struct epoll_event ev, events[max_events];
  107. if (epoll_fd < 0) {
  108. mylog(log_fatal,"epoll return %d\n", epoll_fd);
  109. myexit(-1);
  110. }
  111. ev.events = EPOLLIN;
  112. ev.data.u64 = local_listen_fd;
  113. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev);
  114. if (ret!=0) {
  115. mylog(log_fatal,"add udp_listen_fd error\n");
  116. myexit(-1);
  117. }
  118. assert(new_connected_socket(remote_fd,remote_ip_uint32,remote_port)==0);
  119. remote_fd64=fd_manager.create(remote_fd);
  120. ev.events = EPOLLIN;
  121. ev.data.u64 = remote_fd64;
  122. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, remote_fd, &ev);
  123. if (ret!= 0) {
  124. mylog(log_fatal,"add raw_fd error\n");
  125. myexit(-1);
  126. }
  127. ev.events = EPOLLIN;
  128. ev.data.u64 = delay_manager.get_timer_fd();
  129. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
  130. if (ret!= 0) {
  131. mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
  132. myexit(-1);
  133. }
  134. while(1)////////////////////////
  135. {
  136. if(about_to_exit) myexit(0);
  137. int nfds = epoll_wait(epoll_fd, events, max_events, 180 * 1000);
  138. if (nfds < 0) { //allow zero
  139. if(errno==EINTR )
  140. {
  141. mylog(log_info,"epoll interrupted by signal\n");
  142. myexit(0);
  143. }
  144. else
  145. {
  146. mylog(log_fatal,"epoll_wait return %d\n", nfds);
  147. myexit(-1);
  148. }
  149. }
  150. int idx;
  151. for (idx = 0; idx < nfds; ++idx) {
  152. if (events[idx].data.u64 == (u64_t)local_listen_fd)
  153. {
  154. char data[buf_len];
  155. int data_len;
  156. struct sockaddr_in udp_new_addr_in={0};
  157. socklen_t udp_new_addr_len = sizeof(sockaddr_in);
  158. if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0,
  159. (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) {
  160. mylog(log_error,"recv_from error,this shouldnt happen at client\n");
  161. myexit(1);
  162. };
  163. if(data_len>=mtu_warn)
  164. {
  165. mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  166. }
  167. mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
  168. ntohs(udp_new_addr_in.sin_port),data_len);
  169. ip_port_t ip_port;
  170. ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
  171. ip_port.port=ntohs(udp_new_addr_in.sin_port);
  172. u64_t u64=ip_port.to_u64();
  173. u32_t conv;
  174. if(!conn_info.conv_manager.is_u64_used(u64))
  175. {
  176. if(conn_info.conv_manager.get_size() >=max_conv_num)
  177. {
  178. mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n");
  179. continue;
  180. }
  181. conv=conn_info.conv_manager.get_new_conv();
  182. conn_info.conv_manager.insert_conv(conv,u64);
  183. mylog(log_info,"new packet from %s:%d,conv_id=%x\n",inet_ntoa(udp_new_addr_in.sin_addr),ntohs(udp_new_addr_in.sin_port),conv);
  184. }
  185. else
  186. {
  187. conv=conn_info.conv_manager.find_conv_by_u64(u64);
  188. }
  189. conn_info.conv_manager.update_active_time(conv);
  190. dest_t dest;
  191. dest.type=type_fd64_conv;
  192. dest.inner.fd64=remote_fd64;
  193. dest.conv=conv;
  194. from_normal_to_fec(dest,data,data_len);
  195. //my_send(dest,data,data_len);
  196. }
  197. else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) {
  198. uint64_t value;
  199. read(delay_manager.get_timer_fd(), &value, 8);
  200. //printf("<timerfd_triggered, %d>",delay_mp.size());
  201. //fflush(stdout);
  202. }
  203. else if(events[idx].data.u64>u32_t(-1) )
  204. {
  205. char data[buf_len];
  206. if(!fd_manager.exist(events[idx].data.u64)) //fd64 has been closed
  207. {
  208. continue;
  209. }
  210. assert(events[idx].data.u64==remote_fd64);
  211. int fd=fd_manager.to_fd(remote_fd64);
  212. int data_len =recv(fd,data,max_data_len,0);
  213. mylog(log_trace, "received data from udp fd %d, len=%d\n", remote_fd,data_len);
  214. if(data_len<0)
  215. {
  216. if(errno==ECONNREFUSED)
  217. {
  218. //conn_manager.clear_list.push_back(udp_fd);
  219. mylog(log_debug, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
  220. }
  221. mylog(log_warn, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
  222. continue;
  223. }
  224. if(data_len>mtu_warn)
  225. {
  226. mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  227. }
  228. u32_t conv;
  229. char *new_data;
  230. int new_len;
  231. if(get_conv(conv,data,data_len,new_data,new_len)!=0)
  232. continue;
  233. if(!conn_info.conv_manager.is_conv_used(conv))continue;
  234. u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv);
  235. dest_t dest;
  236. dest.inner.ip_port.from_u64(u64);
  237. dest.type=type_ip_port;
  238. from_fec_to_normal(dest,new_data,new_len);
  239. mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s());
  240. }
  241. /*
  242. else if(events[idx].data.u64 ==(u64_t)timer_fd)
  243. {
  244. u64_t value;
  245. read(timer_fd, &value, 8);
  246. client_on_timer(conn_info);
  247. mylog(log_trace,"epoll_trigger_counter: %d \n",epoll_trigger_counter);
  248. epoll_trigger_counter=0;
  249. }*/
  250. else
  251. {
  252. mylog(log_fatal,"unknown fd,this should never happen\n");
  253. myexit(-1);
  254. }
  255. }
  256. delay_manager.check();
  257. }
  258. return 0;
  259. }
  260. int server_event_loop()
  261. {
  262. //char buf[buf_len];
  263. int i, j, k;int ret;
  264. int yes = 1;
  265. int epoll_fd;
  266. int remote_fd;
  267. conn_info_t conn_info;
  268. init_listen_socket();
  269. epoll_fd = epoll_create1(0);
  270. const int max_events = 4096;
  271. struct epoll_event ev, events[max_events];
  272. if (epoll_fd < 0) {
  273. mylog(log_fatal,"epoll return %d\n", epoll_fd);
  274. myexit(-1);
  275. }
  276. ev.events = EPOLLIN;
  277. ev.data.u64 = local_listen_fd;
  278. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev);
  279. if (ret!=0) {
  280. mylog(log_fatal,"add udp_listen_fd error\n");
  281. myexit(-1);
  282. }
  283. ev.events = EPOLLIN;
  284. ev.data.u64 = delay_manager.get_timer_fd();
  285. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
  286. if (ret!= 0) {
  287. mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
  288. myexit(-1);
  289. }
  290. mylog(log_info,"now listening at %s:%d\n",my_ntoa(local_ip_uint32),local_port);
  291. while(1)////////////////////////
  292. {
  293. if(about_to_exit) myexit(0);
  294. int nfds = epoll_wait(epoll_fd, events, max_events, 180 * 1000);
  295. if (nfds < 0) { //allow zero
  296. if(errno==EINTR )
  297. {
  298. mylog(log_info,"epoll interrupted by signal\n");
  299. myexit(0);
  300. }
  301. else
  302. {
  303. mylog(log_fatal,"epoll_wait return %d\n", nfds);
  304. myexit(-1);
  305. }
  306. }
  307. int idx;
  308. for (idx = 0; idx < nfds; ++idx)
  309. {
  310. /*
  311. if ((events[idx].data.u64 ) == (u64_t)timer_fd)
  312. {
  313. conn_manager.clear_inactive();
  314. u64_t dummy;
  315. read(timer_fd, &dummy, 8);
  316. //current_time_rough=get_current_time();
  317. }
  318. else */if (events[idx].data.u64 == (u64_t)local_listen_fd)
  319. {
  320. //int recv_len;
  321. char data[buf_len];
  322. int data_len;
  323. struct sockaddr_in udp_new_addr_in={0};
  324. socklen_t udp_new_addr_len = sizeof(sockaddr_in);
  325. if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0,
  326. (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) {
  327. mylog(log_error,"recv_from error,this shouldnt happen at client\n");
  328. myexit(1);
  329. };
  330. if(data_len>=mtu_warn)
  331. {
  332. mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  333. }
  334. mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
  335. ntohs(udp_new_addr_in.sin_port),data_len);
  336. ip_port_t ip_port;
  337. ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
  338. ip_port.port=ntohs(udp_new_addr_in.sin_port);
  339. if(!conn_manager.exist(ip_port))
  340. {
  341. conn_manager.insert(ip_port);
  342. conn_info_t &conn_info=conn_manager.find(ip_port);
  343. conn_info.conv_manager.reserve();
  344. }
  345. conn_info_t &conn_info=conn_manager.find(ip_port);
  346. u32_t conv;
  347. char *new_data;
  348. int new_len;
  349. if(get_conv(conv,data,data_len,new_data,new_len)!=0)
  350. continue;
  351. /*
  352. id_t tmp_conv_id;
  353. memcpy(&tmp_conv_id,&data_[0],sizeof(tmp_conv_id));
  354. tmp_conv_id=ntohl(tmp_conv_id);*/
  355. if (!conn_info.conv_manager.is_conv_used(conv))
  356. {
  357. int new_udp_fd;
  358. ret=new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port);
  359. if (ret != 0) {
  360. mylog(log_warn, "[%s:%d]new_connected_socket failed\n",my_ntoa(ip_port.ip),ip_port.port);
  361. continue;
  362. }
  363. fd64_t fd64 = fd_manager.create(new_udp_fd);
  364. ev.events = EPOLLIN;
  365. ev.data.u64 = fd64;
  366. ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_udp_fd, &ev);
  367. conn_info.conv_manager.insert_conv(conv, fd64);
  368. fd_manager.get_info(fd64).ip_port=ip_port;
  369. //assert(!conn_manager.exist_fd64(fd64));
  370. //conn_manager.insert_fd64(fd64,ip_port);
  371. }
  372. fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv);
  373. //int fd=fd_manager.fd64_to_fd(fd64);
  374. dest_t dest;
  375. dest.type=type_fd64;
  376. dest.inner.fd64=fd64;
  377. from_fec_to_normal(dest,new_data,new_len);
  378. //int fd = int((u64 << 32u) >> 32u);
  379. //////////////////////////////todo
  380. //u64_t u64=((u64_t(udp_new_addr_in.sin_addr.s_addr))<<32u)+ntohs(udp_new_addr_in.sin_port);
  381. }
  382. /*
  383. else if ((events[idx].data.u64 >>32u) == 2u)
  384. {
  385. if(debug_flag)begin_time=get_current_time();
  386. int fd=get_u64_l(events[idx].data.u64);
  387. u64_t dummy;
  388. read(fd, &dummy, 8);
  389. if(conn_manager.timer_fd_mp.find(fd)==conn_manager.timer_fd_mp.end()) //this can happen,when fd is a just closed fd
  390. {
  391. mylog(log_info,"timer_fd no longer exits\n");
  392. continue;
  393. }
  394. conn_info_t* p_conn_info=conn_manager.timer_fd_mp[fd];
  395. u32_t ip=p_conn_info->raw_info.recv_info.src_ip;
  396. u32_t port=p_conn_info->raw_info.recv_info.src_port;
  397. assert(conn_manager.exist(ip,port));//TODO remove this for peformance
  398. assert(p_conn_info->state.server_current_state == server_ready); //TODO remove this for peformance
  399. //conn_info_t &conn_info=conn_manager.find(ip,port);
  400. char ip_port[40];
  401. sprintf(ip_port,"%s:%d",my_ntoa(ip),port);
  402. server_on_timer_multi(*p_conn_info,ip_port);
  403. if(debug_flag)
  404. {
  405. end_time=get_current_time();
  406. mylog(log_debug,"(events[idx].data.u64 >>32u) == 2u ,%llu,%llu,%llu \n",begin_time,end_time,end_time-begin_time);
  407. }
  408. }*/
  409. else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) {
  410. uint64_t value;
  411. read(delay_manager.get_timer_fd(), &value, 8);
  412. //printf("<timerfd_triggered, %d>",delay_mp.size());
  413. //fflush(stdout);
  414. }
  415. else if (events[idx].data.u64 >u32_t(-1))
  416. {
  417. char data[buf_len];
  418. int data_len;
  419. fd64_t fd64=events[idx].data.u64;
  420. if(!fd_manager.exist(fd64)) //fd64 has been closed
  421. {
  422. continue;
  423. }
  424. //assert(conn_manager.exist_fd64(fd64));
  425. assert(fd_manager.exist_info(fd64));
  426. ip_port_t ip_port=fd_manager.get_info(fd64).ip_port;
  427. assert(conn_manager.exist(ip_port));
  428. //conn_info_t* p_conn_info=conn_manager.find_insert_p(ip_port);
  429. conn_info_t &conn_info=conn_manager.find(ip_port);
  430. assert(conn_info.conv_manager.is_u64_used(fd64));
  431. u32_t conv=conn_info.conv_manager.find_conv_by_u64(fd64);
  432. int fd=fd_manager.to_fd(fd64);
  433. data_len=recv(fd,data,max_data_len,0);
  434. mylog(log_trace,"received a packet from udp_fd,len:%d\n",data_len);
  435. if(data_len<0)
  436. {
  437. mylog(log_debug,"udp fd,recv_len<0 continue,%s\n",strerror(errno));
  438. continue;
  439. }
  440. if(data_len>=mtu_warn)
  441. {
  442. mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
  443. }
  444. dest_t dest;
  445. dest.type=type_ip_port_conv;
  446. dest.conv=conv;
  447. dest.inner.ip_port=ip_port;
  448. from_normal_to_fec(dest,data,data_len);
  449. mylog(log_trace,"[%s] send packet\n",ip_port.to_s());
  450. }
  451. else
  452. {
  453. mylog(log_fatal,"unknown fd,this should never happen\n");
  454. myexit(-1);
  455. }
  456. }
  457. delay_manager.check();
  458. }
  459. return 0;
  460. }
  461. int unit_test()
  462. {
  463. int i,j,k;
  464. void *code=fec_new(3,6);
  465. char arr[6][100]=
  466. {
  467. "aaa","bbb","ccc"
  468. ,"ddd","eee","fff"
  469. };
  470. char *data[6];
  471. for(i=0;i<6;i++)
  472. {
  473. data[i]=arr[i];
  474. }
  475. rs_encode2(3,6,data,3);
  476. //printf("%d %d",(int)(unsigned char)arr[5][0],(int)('a'^'b'^'c'^'d'^'e'));
  477. for(i=0;i<6;i++)
  478. {
  479. printf("<%s>",data[i]);
  480. }
  481. data[0]=0;
  482. //data[1]=0;
  483. //data[5]=0;
  484. int ret=rs_decode2(3,6,data,3);
  485. printf("ret:%d\n",ret);
  486. for(i=0;i<6;i++)
  487. {
  488. printf("<%s>",data[i]);
  489. }
  490. fec_free(code);
  491. char arr2[6][100]=
  492. {
  493. "aaa11111","","ccc333333333"
  494. ,"ddd444","eee5555","ff6666"
  495. };
  496. blob_encode_t blob_encode;
  497. for(int i=0;i<6;i++)
  498. blob_encode.input(arr2[i],strlen(arr2[i]));
  499. char **output;
  500. int shard_len;
  501. blob_encode.output(7,output,shard_len);
  502. printf("<shard_len:%d>",shard_len);
  503. blob_decode_t blob_decode;
  504. for(int i=0;i<7;i++)
  505. {
  506. blob_decode.input(output[i],shard_len);
  507. }
  508. char **decode_output;
  509. int * len_arr;
  510. int num;
  511. ret=blob_decode.output(num,decode_output,len_arr);
  512. printf("<num:%d,ret:%d>\n",num,ret);
  513. for(int i=0;i<num;i++)
  514. {
  515. char buf[1000]={0};
  516. memcpy(buf,decode_output[i],len_arr[i]);
  517. printf("<%d:%s>",len_arr[i],buf);
  518. }
  519. printf("\n");
  520. fec_encode_manager_t fec_encode_manager;
  521. fec_decode_manager_t fec_decode_manager;
  522. return 0;
  523. }
  524. void print_help()
  525. {
  526. char git_version_buf[100]={0};
  527. strncpy(git_version_buf,gitversion,10);
  528. printf("UDPspeeder\n");
  529. printf("git version:%s ",git_version_buf);
  530. printf("build date:%s %s\n",__DATE__,__TIME__);
  531. printf("repository: https://github.com/wangyu-/UDPspeeder\n");
  532. printf("\n");
  533. printf("usage:\n");
  534. printf(" run as client : ./this_program -c -l local_listen_ip:local_port -r server_ip:server_port [options]\n");
  535. printf(" run as server : ./this_program -s -l server_listen_ip:server_port -r remote_ip:remote_port [options]\n");
  536. printf("\n");
  537. printf("common option,must be same on both sides:\n");
  538. printf(" -k,--key <string> key for simple xor encryption,default:\"secret key\"\n");
  539. printf("main options:\n");
  540. printf(" -d <number> duplicated packet number, -d 0 means no duplicate. default value:0\n");
  541. printf(" -t <number> duplicated packet delay time, unit: 0.1ms,default value:20(2ms)\n");
  542. printf(" -j <number> simulated jitter.randomly delay first packet for 0~jitter_value*0.1 ms,to\n");
  543. printf(" create simulated jitter.default value:0.do not use if you dont\n");
  544. printf(" know what it means\n");
  545. printf(" --report <number> turn on udp send/recv report,and set a time interval for reporting,unit:s\n");
  546. printf("advanced options:\n");
  547. printf(" -t tmin:tmax simliar to -t above,but delay randomly between tmin and tmax\n");
  548. printf(" -j jmin:jmax simliar to -j above,but create jitter randomly between jmin and jmax\n");
  549. printf(" --random-drop <number> simulate packet loss ,unit:0.01%%\n");
  550. printf(" --disable-filter disable duplicate packet filter.\n");
  551. printf(" -m <number> max pending packets,to prevent the program from eating up all your memory,\n");
  552. printf(" default value:0(disabled).\n");
  553. printf("other options:\n");
  554. printf(" --log-level <number> 0:never 1:fatal 2:error 3:warn \n");
  555. printf(" 4:info (default) 5:debug 6:trace\n");
  556. printf(" --log-position enable file name,function name,line number in log\n");
  557. printf(" --disable-color disable log color\n");
  558. printf(" --sock-buf <number> buf size for socket,>=10 and <=10240,unit:kbyte,default:1024\n");
  559. //printf(" -p use multi-process mode instead of epoll.very costly,only for test,do dont use\n");
  560. printf(" -h,--help print this help message\n");
  561. //printf("common options,these options must be same on both side\n");
  562. }
  563. void process_arg(int argc, char *argv[])
  564. {
  565. int is_client=0,is_server=0;
  566. int i, j, k;
  567. int opt;
  568. static struct option long_options[] =
  569. {
  570. {"log-level", required_argument, 0, 1},
  571. {"log-position", no_argument, 0, 1},
  572. {"disable-color", no_argument, 0, 1},
  573. {"disable-filter", no_argument, 0, 1},
  574. {"sock-buf", required_argument, 0, 1},
  575. {"random-drop", required_argument, 0, 1},
  576. {"report", required_argument, 0, 1},
  577. {NULL, 0, 0, 0}
  578. };
  579. int option_index = 0;
  580. if (argc == 1)
  581. {
  582. print_help();
  583. myexit( -1);
  584. }
  585. for (i = 0; i < argc; i++)
  586. {
  587. if(strcmp(argv[i],"--unit-test")==0)
  588. {
  589. unit_test();
  590. myexit(0);
  591. }
  592. }
  593. for (i = 0; i < argc; i++)
  594. {
  595. if(strcmp(argv[i],"-h")==0||strcmp(argv[i],"--help")==0)
  596. {
  597. print_help();
  598. myexit(0);
  599. }
  600. }
  601. for (i = 0; i < argc; i++)
  602. {
  603. if(strcmp(argv[i],"--log-level")==0)
  604. {
  605. if(i<argc -1)
  606. {
  607. sscanf(argv[i+1],"%d",&log_level);
  608. if(0<=log_level&&log_level<log_end)
  609. {
  610. }
  611. else
  612. {
  613. log_bare(log_fatal,"invalid log_level\n");
  614. myexit(-1);
  615. }
  616. }
  617. }
  618. if(strcmp(argv[i],"--disable-color")==0)
  619. {
  620. enable_log_color=0;
  621. }
  622. }
  623. mylog(log_info,"argc=%d ", argc);
  624. for (i = 0; i < argc; i++) {
  625. log_bare(log_info, "%s ", argv[i]);
  626. }
  627. log_bare(log_info, "\n");
  628. if (argc == 1)
  629. {
  630. print_help();
  631. myexit(-1);
  632. }
  633. int no_l = 1, no_r = 1;
  634. while ((opt = getopt_long(argc, argv, "l:r:d:t:hcspk:j:m:",long_options,&option_index)) != -1)
  635. {
  636. //string opt_key;
  637. //opt_key+=opt;
  638. switch (opt)
  639. {
  640. case 'p':
  641. //multi_process_mode=1;
  642. break;
  643. case 'k':
  644. sscanf(optarg,"%s\n",key_string);
  645. mylog(log_debug,"key=%s\n",key_string);
  646. if(strlen(key_string)==0)
  647. {
  648. mylog(log_fatal,"key len=0??\n");
  649. myexit(-1);
  650. }
  651. break;
  652. case 'm':
  653. sscanf(optarg,"%d\n",&max_pending_packet);
  654. if(max_pending_packet<1000)
  655. {
  656. mylog(log_fatal,"max_pending_packet must be >1000\n");
  657. myexit(-1);
  658. }
  659. break;
  660. case 'j':
  661. if (strchr(optarg, ':') == 0)
  662. {
  663. int jitter;
  664. sscanf(optarg,"%d\n",&jitter);
  665. if(jitter<0 ||jitter>1000*100)
  666. {
  667. mylog(log_fatal,"jitter must be between 0 and 100,000(10 second)\n");
  668. myexit(-1);
  669. }
  670. jitter_min=0;
  671. jitter_max=jitter;
  672. }
  673. else
  674. {
  675. sscanf(optarg,"%d:%d\n",&jitter_min,&jitter_max);
  676. if(jitter_min<0 ||jitter_max<0||jitter_min>jitter_max)
  677. {
  678. mylog(log_fatal," must satisfy 0<=jmin<=jmax\n");
  679. myexit(-1);
  680. }
  681. }
  682. break;
  683. case 't':
  684. if (strchr(optarg, ':') == 0)
  685. {
  686. int dup_delay=-1;
  687. sscanf(optarg,"%d\n",&dup_delay);
  688. if(dup_delay<1||dup_delay>1000*100)
  689. {
  690. mylog(log_fatal,"dup_delay must be between 1 and 100,000(10 second)\n");
  691. myexit(-1);
  692. }
  693. dup_delay_min=dup_delay_max=dup_delay;
  694. }
  695. else
  696. {
  697. sscanf(optarg,"%d:%d\n",&dup_delay_min,&dup_delay_max);
  698. if(dup_delay_min<1 ||dup_delay_max<1||dup_delay_min>dup_delay_max)
  699. {
  700. mylog(log_fatal," must satisfy 1<=dmin<=dmax\n");
  701. myexit(-1);
  702. }
  703. }
  704. break;
  705. case 'd':
  706. dup_num=-1;
  707. sscanf(optarg,"%d\n",&dup_num);
  708. if(dup_num<0 ||dup_num>5)
  709. {
  710. mylog(log_fatal,"dup_num must be between 0 and 5\n");
  711. myexit(-1);
  712. }
  713. dup_num+=1;
  714. break;
  715. case 'c':
  716. is_client = 1;
  717. break;
  718. case 's':
  719. is_server = 1;
  720. break;
  721. case 'l':
  722. no_l = 0;
  723. if (strchr(optarg, ':') != 0)
  724. {
  725. sscanf(optarg, "%[^:]:%d", local_ip, &local_port);
  726. }
  727. else
  728. {
  729. mylog(log_fatal," -r ip:port\n");
  730. myexit(1);
  731. strcpy(local_ip, "127.0.0.1");
  732. sscanf(optarg, "%d", &local_port);
  733. }
  734. break;
  735. case 'r':
  736. no_r = 0;
  737. if (strchr(optarg, ':') != 0)
  738. {
  739. //printf("in :\n");
  740. //printf("%s\n",optarg);
  741. sscanf(optarg, "%[^:]:%d", remote_ip, &remote_port);
  742. //printf("%d\n",remote_port);
  743. }
  744. else
  745. {
  746. mylog(log_fatal," -r ip:port\n");
  747. myexit(1);
  748. strcpy(remote_ip, "127.0.0.1");
  749. sscanf(optarg, "%d", &remote_port);
  750. }
  751. break;
  752. case 'h':
  753. break;
  754. case 1:
  755. if(strcmp(long_options[option_index].name,"log-level")==0)
  756. {
  757. }
  758. else if(strcmp(long_options[option_index].name,"disable-filter")==0)
  759. {
  760. disable_replay_filter=1;
  761. //enable_log_color=0;
  762. }
  763. else if(strcmp(long_options[option_index].name,"disable-color")==0)
  764. {
  765. //enable_log_color=0;
  766. }
  767. else if(strcmp(long_options[option_index].name,"log-position")==0)
  768. {
  769. enable_log_position=1;
  770. }
  771. else if(strcmp(long_options[option_index].name,"random-drop")==0)
  772. {
  773. sscanf(optarg,"%d",&random_drop);
  774. if(random_drop<0||random_drop>10000)
  775. {
  776. mylog(log_fatal,"random_drop must be between 0 10000 \n");
  777. myexit(-1);
  778. }
  779. }
  780. else if(strcmp(long_options[option_index].name,"report")==0)
  781. {
  782. sscanf(optarg,"%d",&report_interval);
  783. if(report_interval<=0)
  784. {
  785. mylog(log_fatal,"report_interval must be >0 \n");
  786. myexit(-1);
  787. }
  788. }
  789. else if(strcmp(long_options[option_index].name,"sock-buf")==0)
  790. {
  791. int tmp=-1;
  792. sscanf(optarg,"%d",&tmp);
  793. if(10<=tmp&&tmp<=10*1024)
  794. {
  795. socket_buf_size=tmp*1024;
  796. }
  797. else
  798. {
  799. mylog(log_fatal,"sock-buf value must be between 1 and 10240 (kbyte) \n");
  800. myexit(-1);
  801. }
  802. }
  803. else
  804. {
  805. mylog(log_fatal,"unknown option\n");
  806. myexit(-1);
  807. }
  808. break;
  809. default:
  810. mylog(log_fatal,"unknown option <%x>", opt);
  811. myexit(-1);
  812. }
  813. }
  814. if (no_l)
  815. mylog(log_fatal,"error: -i not found\n");
  816. if (no_r)
  817. mylog(log_fatal,"error: -o not found\n");
  818. if (no_l || no_r)
  819. myexit(-1);
  820. if (is_client == 0 && is_server == 0)
  821. {
  822. mylog(log_fatal,"-s -c hasnt been set\n");
  823. myexit(-1);
  824. }
  825. if (is_client == 1 && is_server == 1)
  826. {
  827. mylog(log_fatal,"-s -c cant be both set\n");
  828. myexit(-1);
  829. }
  830. if(is_client==1)
  831. {
  832. program_mode=client_mode;
  833. }
  834. else
  835. {
  836. program_mode=server_mode;
  837. }
  838. }
  839. int main(int argc, char *argv[])
  840. {
  841. /*
  842. if(argc==1||argc==0)
  843. {
  844. printf("this_program classic\n");
  845. printf("this_program fec\n");
  846. return 0;
  847. }*/
  848. /*
  849. if(argc>=2&&strcmp(argv[1],"fec")!=0)
  850. {
  851. printf("running into classic mode!\n");
  852. return classic::main(argc,argv);
  853. }*/
  854. assert(sizeof(u64_t)==8);
  855. assert(sizeof(i64_t)==8);
  856. assert(sizeof(u32_t)==4);
  857. assert(sizeof(i32_t)==4);
  858. dup2(1, 2); //redirect stderr to stdout
  859. int i, j, k;
  860. process_arg(argc,argv);
  861. delay_manager.capacity=max_pending_packet;
  862. init_random_number_fd();
  863. local_ip_uint32=inet_addr(local_ip);
  864. remote_ip_uint32=inet_addr(remote_ip);
  865. fd_manager.reserve(10007);
  866. if(program_mode==client_mode)
  867. {
  868. client_event_loop();
  869. }
  870. else
  871. {
  872. server_event_loop();
  873. }
  874. return 0;
  875. }