#include "tun_dev.h" static dest_t udp_dest; static dest_t tun_dest; static void local_listen_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) { char data[buf_len]; int len; assert(!(revents & EV_ERROR)); conn_info_t &conn_info = *((conn_info_t *)watcher->data); int local_listen_fd = watcher->fd; // struct sockaddr_in udp_new_addr_in={0}; // socklen_t udp_new_addr_len = sizeof(sockaddr_in); address_t::storage_t udp_new_addr_in = {0}; socklen_t udp_new_addr_len = sizeof(address_t::storage_t); if ((len = recvfrom(local_listen_fd, data, max_data_len + 1, 0, (struct sockaddr *)&udp_new_addr_in, &udp_new_addr_len)) < 0) { mylog(log_error, "recv_from error,this shouldnt happen,err=%s,but we can try to continue\n", strerror(errno)); return; // myexit(1); }; address_t new_addr; new_addr.from_sockaddr((struct sockaddr *)&udp_new_addr_in, udp_new_addr_len); if (len == max_data_len + 1) { mylog(log_warn, "huge packet, data_len > %d,dropped\n", max_data_len); return; } if (de_cook(data, len) < 0) { mylog(log_warn, "de_cook(data,len)failed \n"); return; } char header = 0; if (get_header(header, data, len) != 0) { mylog(log_warn, "get_header() failed\n"); return; } if (udp_dest.inner.fd_addr.addr == new_addr) { if (header == header_keep_alive) { mylog(log_trace, "got keep_alive packet\n"); return; } if (header != header_new_connect && header != header_normal) { mylog(log_warn, "invalid header\n"); return; } } else { if (header == header_keep_alive) { mylog(log_debug, "got keep_alive packet from unexpected client\n"); return; } if (header == header_new_connect) { mylog(log_info, "new connection from %s\n", new_addr.get_str()); udp_dest.inner.fd_addr.addr = new_addr; // udp_dest.inner.fd_ip_port.ip_port.ip=udp_new_addr_in.sin_addr.s_addr; // udp_dest.inner.fd_ip_port.ip_port.port=ntohs(udp_new_addr_in.sin_port); conn_info.fec_decode_manager.clear(); conn_info.fec_encode_manager.clear_data(); conn_info.stat.clear(); } else if (header == header_normal) { mylog(log_debug, "rejected connection from %s\n", new_addr.get_str()); len = 1; data[0] = header_reject; do_cook(data, len); dest_t tmp_dest; tmp_dest.type = type_fd_addr; tmp_dest.inner.fd_addr.fd = local_listen_fd; tmp_dest.inner.fd_addr.addr = new_addr; delay_manager.add(0, tmp_dest, data, len); ; return; } else { mylog(log_warn, "invalid header\n"); } } mylog(log_trace, "Received packet from %s,len: %d\n", new_addr.get_str(), len); from_fec_to_normal2(conn_info, tun_dest, data, len); } static void tun_fd_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) { char data[buf_len]; int len; assert(!(revents & EV_ERROR)); conn_info_t &conn_info = *((conn_info_t *)watcher->data); int tun_fd = watcher->fd; len = read(tun_fd, data, max_data_len + 1); if (len == max_data_len + 1) { mylog(log_warn, "huge packet, data_len > %d,dropped\n", max_data_len); return; } if (len < 0) { mylog(log_warn, "read from tun_fd return %d,errno=%s\n", len, strerror(errno)); return; } do_mssfix(data, len); mylog(log_trace, "Received packet from tun,len: %d\n", len); if (udp_dest.inner.fd_addr.addr.is_vaild() == 0) { mylog(log_debug, "received packet from tun,but there is no client yet,dropped packet\n"); return; } from_normal_to_fec2(conn_info, udp_dest, data, len, header_normal); } static void delay_manager_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents) { assert(!(revents & EV_ERROR)); // do nothing } static void fec_encode_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents) { assert(!(revents & EV_ERROR)); mylog(log_trace, "fec_encode_cb() called\n"); conn_info_t &conn_info = *((conn_info_t *)watcher->data); assert(udp_dest.inner.fd_addr.addr.is_vaild() != 0); /// mylog(log_trace,"events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()\n"); /// uint64_t fd64=events[idx].data.u64; /// uint64_t value; /// if(!fd_manager.exist(fd64)) //fd64 has been closed ///{ /// mylog(log_trace,"!fd_manager.exist(fd64)"); /// continue; /// } /// if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8) ///{ /// mylog(log_trace,"(ret=read(fd_manager.to_fd(fd64), &value, 8))!=8,ret=%d\n",ret); /// continue; /// } /// if(value==0) ///{ /// mylog(log_debug,"value==0\n"); /// continue; /// } /// assert(value==1); from_normal_to_fec2(conn_info, udp_dest, 0, 0, header_normal); } static void fifo_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) { assert(!(revents & EV_ERROR)); int fifo_fd = watcher->fd; char buf[buf_len]; int len = read(fifo_fd, buf, sizeof(buf)); if (len < 0) { mylog(log_warn, "fifo read failed len=%d,errno=%s\n", len, strerror(errno)); return; } buf[len] = 0; handle_command(buf); } static void conn_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents) { assert(!(revents & EV_ERROR)); conn_info_t &conn_info = *((conn_info_t *)watcher->data); mylog(log_trace, "conn_timer_cb() called\n"); // uint64_t value; // read(conn_info.timer.get_timer_fd(), &value, 8); if (udp_dest.inner.fd_addr.addr.is_vaild() == 0) { return; } conn_info.stat.report_as_server(udp_dest.inner.fd_addr.addr); do_keep_alive(udp_dest); } static void prepare_cb(struct ev_loop *loop, struct ev_prepare *watcher, int revents) { assert(!(revents & EV_ERROR)); delay_manager.check(); } int tun_dev_server_event_loop() { int i, j, k, ret; int tun_fd; int local_listen_fd; conn_info_t *conn_info_p = new conn_info_t; conn_info_t &conn_info = *conn_info_p; // huge size of conn_info,do not allocate on stack tun_fd = get_tun_fd(tun_dev); assert(tun_fd > 0); assert(new_listen_socket2(local_listen_fd, local_addr) == 0); assert(set_tun(tun_dev, htonl((ntohl(sub_net_uint32) & 0xFFFFFF00) | 1), htonl((ntohl(sub_net_uint32) & 0xFFFFFF00) | 2), tun_mtu) == 0); udp_dest.cook = 1; udp_dest.type = type_fd_addr; udp_dest.inner.fd_addr.fd = local_listen_fd; udp_dest.inner.fd_addr.addr.clear(); tun_dest.type = type_write_fd; tun_dest.inner.fd = tun_fd; /// epoll_fd = epoll_create1(0); /// assert(epoll_fd>0); /// const int max_events = 4096; /// struct epoll_event ev, events[max_events]; /// if (epoll_fd < 0) { /// mylog(log_fatal,"epoll return %d\n", epoll_fd); /// myexit(-1); /// } struct ev_loop *loop = ev_default_loop(0); assert(loop != NULL); conn_info.loop = loop; /// ev.events = EPOLLIN; /// ev.data.u64 = local_listen_fd; /// ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev); /// if (ret!=0) { /// mylog(log_fatal,"add udp_listen_fd error\n"); /// myexit(-1); /// } struct ev_io local_listen_watcher; local_listen_watcher.data = &conn_info; ev_io_init(&local_listen_watcher, local_listen_cb, local_listen_fd, EV_READ); ev_io_start(loop, &local_listen_watcher); /// ev.events = EPOLLIN; /// ev.data.u64 = tun_fd; /// ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, tun_fd, &ev); /// if (ret!=0) { /// mylog(log_fatal,"add tun_fd error\n"); /// myexit(-1); /// } struct ev_io tun_fd_watcher; tun_fd_watcher.data = &conn_info; ev_io_init(&tun_fd_watcher, tun_fd_cb, tun_fd, EV_READ); ev_io_start(loop, &tun_fd_watcher); /// ev.events = EPOLLIN; /// ev.data.u64 = delay_manager.get_timer_fd(); /// mylog(log_debug,"delay_manager.get_timer_fd()=%d\n",delay_manager.get_timer_fd()); /// ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev); /// if (ret!= 0) { /// mylog(log_fatal,"add delay_manager.get_timer_fd() error\n"); /// myexit(-1); /// } delay_manager.set_loop_and_cb(loop, delay_manager_cb); /// u64_t tmp_timer_fd64=conn_info.fec_encode_manager.get_timer_fd64(); /// ev.events = EPOLLIN; /// ev.data.u64 = tmp_timer_fd64; /// mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64()); /// ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(tmp_timer_fd64), &ev); /// if (ret!= 0) { /// mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n"); /// myexit(-1); /// } conn_info.fec_encode_manager.set_data(&conn_info); conn_info.fec_encode_manager.set_loop_and_cb(loop, fec_encode_cb); /// conn_info.timer.add_fd_to_epoll(epoll_fd); /// conn_info.timer.set_timer_repeat_us(timer_interval*1000); conn_info.timer.data = &conn_info; ev_init(&conn_info.timer, conn_timer_cb); ev_timer_set(&conn_info.timer, 0, timer_interval / 1000.0); ev_timer_start(loop, &conn_info.timer); struct ev_io fifo_watcher; int fifo_fd = -1; if (fifo_file[0] != 0) { fifo_fd = create_fifo(fifo_file); ev_io_init(&fifo_watcher, fifo_cb, fifo_fd, EV_READ); ev_io_start(loop, &fifo_watcher); mylog(log_info, "fifo_file=%s\n", fifo_file); } ev_prepare prepare_watcher; ev_init(&prepare_watcher, prepare_cb); ev_prepare_start(loop, &prepare_watcher); mylog(log_info, "now listening at %s\n", local_addr.get_str()); ev_run(loop, 0); mylog(log_warn, "ev_run returned\n"); myexit(0); return 0; }