|
@@ -286,7 +286,7 @@ impl DataServer {
|
|
|
plugin.notify_tx = Some(tx);
|
|
|
plugin.notify_rx = Mutex::new(Some(rx));
|
|
|
|
|
|
- let (tx, rx) = mpsc::channel(1024 * 32);
|
|
|
+ let (tx, rx) = mpsc::channel(1024 * 256);
|
|
|
plugin.data_tx = Some(tx);
|
|
|
plugin.data_rx = Mutex::new(Some(rx));
|
|
|
|
|
@@ -670,7 +670,10 @@ impl DataServer {
|
|
|
|
|
|
this.stat.clone().start_worker()?;
|
|
|
|
|
|
- let mut req_list: Vec<Box<dyn DnsRequest>> = Vec::new();
|
|
|
+ let req_list_size = if batch_mode { 1024 * 16 } else { 1 };
|
|
|
+ let mut req_list: Vec<Box<dyn DnsRequest>> = Vec::with_capacity(req_list_size);
|
|
|
+ let recv_buffer_size = if batch_mode { 4096 } else { 1 };
|
|
|
+ let mut recv_buffer = Vec::with_capacity(recv_buffer_size);
|
|
|
let mut batch_timer: Option<tokio::time::Interval> = None;
|
|
|
let mut check_timer = tokio::time::interval(Duration::from_secs(60));
|
|
|
let is_check_timer_running = Arc::new(AtomicBool::new(false));
|
|
@@ -707,32 +710,29 @@ impl DataServer {
|
|
|
req_list.clear();
|
|
|
batch_timer = None;
|
|
|
}
|
|
|
- res = data_rx.recv() => {
|
|
|
- match res {
|
|
|
- Some(req) => {
|
|
|
- req_list.push(req);
|
|
|
-
|
|
|
- if batch_mode {
|
|
|
- if req_list.len() == 1 {
|
|
|
- batch_timer = Some(tokio::time::interval_at(
|
|
|
- Instant::now() + Duration::from_millis(500),
|
|
|
- Duration::from_secs(1),
|
|
|
- ));
|
|
|
- }
|
|
|
-
|
|
|
- if req_list.len() <= 65535 {
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- DataServer::data_server_handle_dns_request(this.clone(), &req_list).await;
|
|
|
- req_list.clear();
|
|
|
- batch_timer = None;
|
|
|
+ count = data_rx.recv_many(&mut recv_buffer, recv_buffer_size) => {
|
|
|
+ if count <= 0 {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ req_list.extend(recv_buffer.drain(0..count));
|
|
|
+
|
|
|
+ if batch_mode {
|
|
|
+ if req_list.len() >= 1 && batch_timer.is_none() {
|
|
|
+ batch_timer = Some(tokio::time::interval_at(
|
|
|
+ Instant::now() + Duration::from_millis(500),
|
|
|
+ Duration::from_secs(1),
|
|
|
+ ));
|
|
|
}
|
|
|
- None => {
|
|
|
+
|
|
|
+ if req_list.len() < req_list_size - recv_buffer_size {
|
|
|
continue;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ DataServer::data_server_handle_dns_request(this.clone(), &req_list).await;
|
|
|
+ req_list.clear();
|
|
|
+ batch_timer = None;
|
|
|
}
|
|
|
}
|
|
|
}
|