data_server.rs 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822
  1. /*************************************************************************
  2. *
  3. * Copyright (C) 2018-2025 Ruilin Peng (Nick) <[email protected]>.
  4. *
  5. * smartdns is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * smartdns is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. */
  18. use crate::data_stats::*;
  19. use crate::data_upstream_server::UpstreamServerInfo;
  20. use crate::db::*;
  21. use crate::dns_log;
  22. use crate::plugin::SmartdnsPlugin;
  23. use crate::server_log::ServerAuditLog;
  24. use crate::server_log::ServerAuditLogMsg;
  25. use crate::server_log::ServerLog;
  26. use crate::server_log::ServerLogMsg;
  27. use crate::smartdns;
  28. use crate::smartdns::*;
  29. use crate::utils;
  30. use crate::whois;
  31. use crate::whois::WhoIsInfo;
  32. use std::collections::HashMap;
  33. use std::error::Error;
  34. use std::sync::atomic::AtomicBool;
  35. use std::sync::Weak;
  36. use std::sync::{Arc, Mutex, RwLock};
  37. use tokio::sync::mpsc;
  38. use tokio::task::JoinHandle;
  39. use tokio::time::Duration;
  40. use tokio::time::Instant;
  41. pub const DEFAULT_MAX_LOG_AGE: u64 = 30 * 24 * 60 * 60;
  42. pub const DEFAULT_MAX_LOG_AGE_MS: u64 = DEFAULT_MAX_LOG_AGE * 1000;
  43. pub const MAX_LOG_AGE_VALUE_MIN: u64 = 600;
  44. pub const MAX_LOG_AGE_VALUE_MAX: u64 = 365 * 24 * 60 * 60 * 10;
  45. pub const MIN_FREE_DISK_SPACE: u64 = 1024 * 1024 * 8;
  46. pub const DB_FILE_NAME: &str = "smartdns.db";
  47. #[derive(Clone)]
  48. pub struct OverviewData {
  49. pub server_name: String,
  50. pub db_size: u64,
  51. pub startup_timestamp: u64,
  52. pub free_disk_space: u64,
  53. pub is_process_suspended: bool,
  54. }
  55. #[derive(Clone)]
  56. pub struct MetricsData {
  57. pub total_query_count: u64,
  58. pub block_query_count: u64,
  59. pub request_drop_count: u64,
  60. pub fail_query_count: u64,
  61. pub avg_query_time: f64,
  62. pub cache_hit_rate: f64,
  63. pub cache_number: u64,
  64. pub cache_memory_size: u64,
  65. pub qps: u32,
  66. pub memory_usage: u64,
  67. pub is_metrics_suspended: bool,
  68. }
  69. #[derive(Clone)]
  70. pub struct DataServerConfig {
  71. pub db_file: String,
  72. pub data_path: String,
  73. pub max_log_age_ms: u64,
  74. }
  75. impl DataServerConfig {
  76. pub fn new() -> Self {
  77. DataServerConfig {
  78. data_path: Plugin::dns_conf_data_dir(),
  79. db_file: Plugin::dns_conf_data_dir() + "/" + DB_FILE_NAME,
  80. max_log_age_ms: DEFAULT_MAX_LOG_AGE_MS,
  81. }
  82. }
  83. pub fn load_config(&mut self, data_server: Arc<DataServer>) -> Result<(), Box<dyn Error>> {
  84. self.max_log_age_ms = utils::parse_value(
  85. data_server.get_server_config("smartdns-ui.max-query-log-age"),
  86. MAX_LOG_AGE_VALUE_MIN,
  87. MAX_LOG_AGE_VALUE_MAX,
  88. DEFAULT_MAX_LOG_AGE,
  89. ) * 1000;
  90. let log_level = data_server.get_server_config("log-level");
  91. if let Some(log_level) = log_level {
  92. let log_level = log_level.try_into();
  93. match log_level {
  94. Ok(log_level) => {
  95. dns_log_set_level(log_level);
  96. }
  97. Err(_) => {
  98. dns_log!(LogLevel::WARN, "log level is invalid");
  99. }
  100. }
  101. }
  102. Ok(())
  103. }
  104. }
  105. pub struct DataServerControl {
  106. data_server: Arc<DataServer>,
  107. server_thread: Mutex<Option<JoinHandle<()>>>,
  108. is_init: Mutex<bool>,
  109. is_run: Mutex<bool>,
  110. plugin: Mutex<Weak<SmartdnsPlugin>>,
  111. }
  112. impl DataServerControl {
  113. pub fn new() -> Self {
  114. DataServerControl {
  115. data_server: Arc::new(DataServer::new()),
  116. server_thread: Mutex::new(None),
  117. is_init: Mutex::new(false),
  118. is_run: Mutex::new(false),
  119. plugin: Mutex::new(Weak::new()),
  120. }
  121. }
  122. pub fn get_data_server(&self) -> Arc<DataServer> {
  123. Arc::clone(&self.data_server)
  124. }
  125. pub fn set_plugin(&self, plugin: Arc<SmartdnsPlugin>) {
  126. *self.plugin.lock().unwrap() = Arc::downgrade(&plugin);
  127. }
  128. pub fn get_plugin(&self) -> Result<Arc<SmartdnsPlugin>, Box<dyn Error>> {
  129. let plugin = match self.plugin.lock() {
  130. Ok(plugin) => plugin,
  131. Err(_) => return Err("Failed to lock plugin mutex".into()),
  132. };
  133. if let Some(plugin) = plugin.upgrade() {
  134. return Ok(plugin);
  135. }
  136. Err("Plugin is not set".into())
  137. }
  138. pub fn init_db(&self, conf: &DataServerConfig) -> Result<(), Box<dyn Error>> {
  139. let inner_clone = Arc::clone(&self.data_server);
  140. let ret = inner_clone.init_server(conf);
  141. if let Err(e) = ret {
  142. return Err(e);
  143. }
  144. *self.is_init.lock().unwrap() = true;
  145. Ok(())
  146. }
  147. pub fn start_data_server(&self) -> Result<(), Box<dyn Error>> {
  148. let inner_clone = Arc::clone(&self.data_server);
  149. if *self.is_init.lock().unwrap() == false {
  150. return Err("data server not init".into());
  151. }
  152. let plugin = self.get_plugin()?;
  153. self.data_server.set_plugin(plugin.clone());
  154. let rt = plugin.get_runtime();
  155. let server_thread = rt.spawn(async move {
  156. let ret = DataServer::data_server_loop(inner_clone).await;
  157. if let Err(e) = ret {
  158. dns_log!(LogLevel::ERROR, "data server error: {}", e);
  159. Plugin::smartdns_exit(1);
  160. }
  161. dns_log!(LogLevel::DEBUG, "data server exit.");
  162. });
  163. *self.is_run.lock().unwrap() = true;
  164. *self.server_thread.lock().unwrap() = Some(server_thread);
  165. Ok(())
  166. }
  167. pub fn stop_data_server(&self) {
  168. if *self.is_run.lock().unwrap() == false {
  169. return;
  170. }
  171. self.data_server.stop_data_server();
  172. let _server_thread = self.server_thread.lock().unwrap().take();
  173. if let Some(server_thread) = _server_thread {
  174. let plugin = self.get_plugin();
  175. if plugin.is_err() {
  176. dns_log!(
  177. LogLevel::ERROR,
  178. "get plugin error: {}",
  179. plugin.err().unwrap()
  180. );
  181. return;
  182. }
  183. let plugin = plugin.unwrap();
  184. let rt = plugin.get_runtime();
  185. tokio::task::block_in_place(|| {
  186. if let Err(e) = rt.block_on(server_thread) {
  187. dns_log!(LogLevel::ERROR, "http server stop error: {}", e);
  188. }
  189. });
  190. }
  191. *self.is_run.lock().unwrap() = false;
  192. }
  193. pub fn send_request(&self, request: Box<dyn DnsRequest>) -> Result<(), Box<dyn Error>> {
  194. if request.is_prefetch_request() {
  195. return Ok(());
  196. }
  197. self.data_server.get_stat().add_qps_count(1);
  198. if self.data_server.is_handle_request_disabled() {
  199. return Ok(());
  200. }
  201. if let Some(tx) = self.data_server.data_tx.as_ref() {
  202. let ret = tx.try_send(request);
  203. if let Err(e) = ret {
  204. self.data_server.get_stat().add_request_drop(1);
  205. return Err(e.to_string().into());
  206. }
  207. }
  208. Ok(())
  209. }
  210. pub fn server_log(&self, level: LogLevel, msg: &str, msg_len: i32) {
  211. self.data_server.server_log(level, msg, msg_len);
  212. }
  213. pub fn server_audit_log(&self, msg: &str, msg_len: i32) {
  214. self.data_server.server_audit_log(msg, msg_len);
  215. }
  216. }
  217. impl Drop for DataServerControl {
  218. fn drop(&mut self) {
  219. self.stop_data_server();
  220. }
  221. }
  222. pub struct DataServer {
  223. conf: Arc<RwLock<DataServerConfig>>,
  224. notify_tx: Option<mpsc::Sender<()>>,
  225. notify_rx: Mutex<Option<mpsc::Receiver<()>>>,
  226. data_tx: Option<mpsc::Sender<Box<dyn DnsRequest>>>,
  227. data_rx: Mutex<Option<mpsc::Receiver<Box<dyn DnsRequest>>>>,
  228. db: Arc<DB>,
  229. disable_handle_request: AtomicBool,
  230. stat: Arc<DataStats>,
  231. server_log: ServerLog,
  232. server_audit_log: ServerAuditLog,
  233. plugin: Mutex<Weak<SmartdnsPlugin>>,
  234. whois: whois::WhoIs,
  235. startup_timestamp: u64,
  236. recv_in_batch: Mutex<bool>,
  237. }
  238. impl DataServer {
  239. pub fn new() -> Self {
  240. let db = Arc::new(DB::new());
  241. let conf = Arc::new(RwLock::new(DataServerConfig::new()));
  242. let mut plugin = DataServer {
  243. conf: conf.clone(),
  244. notify_tx: None,
  245. notify_rx: Mutex::new(None),
  246. data_tx: None,
  247. data_rx: Mutex::new(None),
  248. db: db.clone(),
  249. stat: DataStats::new(db, conf.clone()),
  250. server_log: ServerLog::new(),
  251. server_audit_log: ServerAuditLog::new(),
  252. plugin: Mutex::new(Weak::new()),
  253. whois: whois::WhoIs::new(),
  254. startup_timestamp: get_utc_time_ms(),
  255. disable_handle_request: AtomicBool::new(false),
  256. recv_in_batch: Mutex::new(true),
  257. };
  258. let (tx, rx) = mpsc::channel(100);
  259. plugin.notify_tx = Some(tx);
  260. plugin.notify_rx = Mutex::new(Some(rx));
  261. let (tx, rx) = mpsc::channel(1024 * 256);
  262. plugin.data_tx = Some(tx);
  263. plugin.data_rx = Mutex::new(Some(rx));
  264. plugin
  265. }
  266. pub fn get_recv_in_batch(&self) -> bool {
  267. *self.recv_in_batch.lock().unwrap()
  268. }
  269. pub fn set_recv_in_batch(&self, recv_in_batch: bool) {
  270. *self.recv_in_batch.lock().unwrap() = recv_in_batch;
  271. }
  272. fn init_server(&self, conf: &DataServerConfig) -> Result<(), Box<dyn Error>> {
  273. let mut conf_clone = self.conf.write().unwrap();
  274. *conf_clone = conf.clone();
  275. smartdns::smartdns_enable_update_neighbour(true);
  276. if utils::is_dir_writable(&conf_clone.data_path) == false {
  277. return Err(format!(
  278. "data path '{}' is not exist or writable.",
  279. conf_clone.data_path
  280. )
  281. .into());
  282. }
  283. conf_clone.db_file = conf_clone.data_path.clone() + "/" + DB_FILE_NAME;
  284. dns_log!(LogLevel::INFO, "open db: {}", conf_clone.db_file);
  285. let ret = self.db.open(&conf_clone.db_file);
  286. if let Err(e) = ret {
  287. return Err(e);
  288. }
  289. let ret = self.stat.init();
  290. if let Err(e) = ret {
  291. return Err(e);
  292. }
  293. Ok(())
  294. }
  295. pub fn set_plugin(&self, plugin: Arc<SmartdnsPlugin>) {
  296. *self.plugin.lock().unwrap() = Arc::downgrade(&plugin);
  297. }
  298. pub fn get_plugin(&self) -> Result<Arc<SmartdnsPlugin>, Box<dyn Error>> {
  299. let plugin = match self.plugin.lock() {
  300. Ok(plugin) => plugin,
  301. Err(_) => return Err("Failed to lock plugin mutex".into()),
  302. };
  303. if let Some(plugin) = plugin.upgrade() {
  304. return Ok(plugin);
  305. }
  306. Err("Plugin is not set".into())
  307. }
  308. pub fn get_data_server_config(&self) -> DataServerConfig {
  309. let conf = self.conf.read().unwrap();
  310. conf.clone()
  311. }
  312. pub fn get_config(&self, key: &str) -> Option<String> {
  313. let ret = self.db.get_config(key);
  314. if let Ok(value) = ret {
  315. return value;
  316. }
  317. None
  318. }
  319. pub fn get_server_config_from_file(&self, key: &str) -> Option<String> {
  320. let ret = Plugin::dns_conf_plugin_config(key);
  321. if let Some(value) = ret {
  322. return Some(value);
  323. }
  324. None
  325. }
  326. pub fn get_server_config(&self, key: &str) -> Option<String> {
  327. let ret = self.get_config(key);
  328. if let Some(value) = ret {
  329. return Some(value);
  330. }
  331. let ret = Plugin::dns_conf_plugin_config(key);
  332. if let Some(value) = ret {
  333. return Some(value);
  334. }
  335. None
  336. }
  337. pub async fn whois(&self, domain: &str) -> Result<WhoIsInfo, Box<dyn Error>> {
  338. self.whois.query(domain).await
  339. }
  340. pub fn get_config_list(&self) -> Result<HashMap<String, String>, Box<dyn Error>> {
  341. self.db.get_config_list()
  342. }
  343. pub fn set_config(&self, key: &str, value: &str) -> Result<(), Box<dyn Error>> {
  344. self.db.set_config(key, value)
  345. }
  346. pub fn get_upstream_server_list(&self) -> Result<Vec<UpstreamServerInfo>, Box<dyn Error>> {
  347. let servers = UpstreamServerInfo::get_all()?;
  348. Ok(servers)
  349. }
  350. pub fn get_domain_list(
  351. &self,
  352. param: &DomainListGetParam,
  353. ) -> Result<QueryDomainListResult, Box<dyn Error>> {
  354. self.db.get_domain_list(Some(param))
  355. }
  356. pub fn get_domain_list_count(&self) -> u64 {
  357. self.db.get_domain_list_count(None)
  358. }
  359. pub fn delete_domain_by_id(&self, id: u64) -> Result<u64, Box<dyn Error>> {
  360. self.db.delete_domain_by_id(id)
  361. }
  362. pub fn delete_domain_before_timestamp(&self, timestamp: u64) -> Result<u64, Box<dyn Error>> {
  363. self.db.delete_domain_before_timestamp(timestamp)
  364. }
  365. pub fn delete_client_by_id(&self, id: u64) -> Result<u64, Box<dyn Error>> {
  366. self.db.delete_client_by_id(id)
  367. }
  368. pub fn get_client_list(
  369. &self,
  370. param: &ClientListGetParam,
  371. ) -> Result<QueryClientListResult, Box<dyn Error>> {
  372. self.db.get_client_list(Some(param))
  373. }
  374. pub fn get_top_client_top_list(
  375. &self,
  376. count: Option<u32>,
  377. ) -> Result<Vec<ClientQueryCount>, Box<dyn Error>> {
  378. self.db.get_client_top_list(count.unwrap_or(10))
  379. }
  380. pub fn get_top_domain_top_list(
  381. &self,
  382. count: Option<u32>,
  383. ) -> Result<Vec<DomainQueryCount>, Box<dyn Error>> {
  384. self.db.get_domain_top_list(count.unwrap_or(10))
  385. }
  386. pub fn get_hourly_query_count(
  387. &self,
  388. past_hours: Option<u32>,
  389. ) -> Result<HourlyQueryCount, Box<dyn Error>> {
  390. self.db.get_hourly_query_count(past_hours.unwrap_or(24))
  391. }
  392. pub fn get_daily_query_count(
  393. &self,
  394. past_days: Option<u32>,
  395. ) -> Result<DailyQueryCount, Box<dyn Error>> {
  396. self.db.get_daily_query_count(past_days.unwrap_or(30))
  397. }
  398. pub fn get_stat(&self) -> Arc<DataStats> {
  399. self.stat.clone()
  400. }
  401. pub fn get_metrics(&self) -> Result<MetricsData, Box<dyn Error + Send>> {
  402. let metrics = MetricsData {
  403. total_query_count: self.stat.get_total_request(),
  404. block_query_count: self.stat.get_total_blocked_request(),
  405. request_drop_count: self.stat.get_request_drop(),
  406. fail_query_count: self.stat.get_total_failed_request(),
  407. avg_query_time: smartdns::Stats::get_avg_process_time(),
  408. cache_hit_rate: smartdns::Stats::get_cache_hit_rate(),
  409. cache_number: smartdns::Plugin::dns_cache_total_num() as u64,
  410. cache_memory_size: smartdns::Stats::get_cache_memory_size(),
  411. qps: self.stat.get_qps(),
  412. memory_usage: self.stat.get_memory_usage(),
  413. is_metrics_suspended: self.is_handle_request_disabled(),
  414. };
  415. Ok(metrics)
  416. }
  417. pub fn is_handle_request_disabled(&self) -> bool {
  418. self.disable_handle_request
  419. .load(std::sync::atomic::Ordering::Relaxed)
  420. }
  421. pub fn get_free_disk_space(&self) -> u64 {
  422. utils::get_free_disk_space(&self.get_data_server_config().db_file)
  423. }
  424. pub fn get_overview(&self) -> Result<OverviewData, Box<dyn Error + Send>> {
  425. let overview = OverviewData {
  426. server_name: smartdns::smartdns_get_server_name(),
  427. db_size: self.db.get_db_size(),
  428. startup_timestamp: self.startup_timestamp,
  429. free_disk_space: self.get_free_disk_space(),
  430. is_process_suspended: self.is_handle_request_disabled(),
  431. };
  432. Ok(overview)
  433. }
  434. pub fn insert_client_by_list(&self, data: &Vec<ClientData>) -> Result<(), Box<dyn Error>> {
  435. self.db.insert_client(data)
  436. }
  437. pub fn insert_domain_by_list(&self, data: &Vec<DomainData>) -> Result<(), Box<dyn Error>> {
  438. self.db.insert_domain(data)
  439. }
  440. pub fn insert_domain(&self, data: &DomainData) -> Result<(), Box<dyn Error>> {
  441. let list = vec![data.clone()];
  442. self.stat.add_total_request(1);
  443. if data.is_blocked {
  444. self.stat.add_total_blocked_request(1);
  445. }
  446. if data.reply_code != 0 {
  447. self.stat.add_total_failed_request(1);
  448. }
  449. self.db.insert_domain(&list)
  450. }
  451. async fn data_server_handle_dns_request(
  452. this: Arc<DataServer>,
  453. req_list: &Vec<Box<dyn DnsRequest>>,
  454. ) {
  455. let mut domain_data_list = Vec::new();
  456. let mut client_data_list = Vec::new();
  457. let mut blocked_num = 0;
  458. let mut failed_num = 0;
  459. let timestamp_now = get_utc_time_ms();
  460. for req in req_list {
  461. if req.is_prefetch_request() {
  462. continue;
  463. }
  464. if req.is_dualstack_request() {
  465. continue;
  466. }
  467. if req.get_is_blocked() {
  468. blocked_num += 1;
  469. }
  470. if req.get_rcode() != 0 {
  471. failed_num += 1;
  472. }
  473. let domain_data = DomainData {
  474. id: 0,
  475. domain: req.get_domain(),
  476. domain_type: req.get_qtype(),
  477. client: req.get_remote_addr(),
  478. domain_group: req.get_group_name(),
  479. reply_code: req.get_rcode(),
  480. timestamp: req.get_query_timestamp(),
  481. query_time: req.get_query_time(),
  482. ping_time: req.get_ping_time(),
  483. is_blocked: req.get_is_blocked(),
  484. is_cached: req.get_is_cached(),
  485. };
  486. dns_log!(
  487. LogLevel::DEBUG,
  488. "insert domain:{}, type:{}",
  489. domain_data.domain,
  490. domain_data.domain_type
  491. );
  492. domain_data_list.push(domain_data);
  493. let mac_str = req
  494. .get_remote_mac()
  495. .iter()
  496. .map(|byte| format!("{:02x}", byte))
  497. .collect::<Vec<String>>()
  498. .join(":");
  499. let client_data = ClientData {
  500. id: 0,
  501. client_ip: req.get_remote_addr(),
  502. hostname: "".to_string(),
  503. mac: mac_str,
  504. last_query_timestamp: timestamp_now,
  505. };
  506. client_data_list.push(client_data);
  507. }
  508. this.stat.add_total_request(domain_data_list.len() as u64);
  509. this.stat.add_total_blocked_request(blocked_num as u64);
  510. this.stat.add_total_failed_request(failed_num as u64);
  511. dns_log!(
  512. LogLevel::DEBUG,
  513. "insert domain list count:{}",
  514. domain_data_list.len()
  515. );
  516. let ret = DataServer::call_blocking(this.clone(), move || {
  517. let _ = match this.insert_domain_by_list(&domain_data_list) {
  518. Ok(v) => v,
  519. Err(e) => return Err(e.to_string()),
  520. };
  521. let ret = match this.insert_client_by_list(&client_data_list) {
  522. Ok(v) => v,
  523. Err(e) => return Err(e.to_string()),
  524. };
  525. Ok(ret)
  526. })
  527. .await;
  528. if let Err(e) = ret {
  529. dns_log!(LogLevel::ERROR, "insert domain error: {}", e);
  530. return;
  531. }
  532. let ret = ret.unwrap();
  533. if let Err(e) = ret {
  534. dns_log!(LogLevel::ERROR, "insert domain error: {}", e);
  535. }
  536. }
  537. pub async fn get_log_stream(&self) -> mpsc::Receiver<ServerLogMsg> {
  538. return self.server_log.get_log_stream().await;
  539. }
  540. pub fn server_log(&self, level: LogLevel, msg: &str, msg_len: i32) {
  541. self.server_log.dispatch_log(level, msg, msg_len);
  542. }
  543. pub async fn get_audit_log_stream(&self) -> mpsc::Receiver<ServerAuditLogMsg> {
  544. return self.server_audit_log.get_audit_log_stream().await;
  545. }
  546. pub fn server_audit_log(&self, msg: &str, msg_len: i32) {
  547. self.server_audit_log.dispatch_audit_log(msg, msg_len);
  548. }
  549. fn server_check(&self) {
  550. let free_disk_space = self.get_free_disk_space();
  551. if free_disk_space < MIN_FREE_DISK_SPACE {
  552. if self
  553. .disable_handle_request
  554. .fetch_or(true, std::sync::atomic::Ordering::Relaxed)
  555. {
  556. return;
  557. }
  558. dns_log!(
  559. LogLevel::WARN,
  560. "free disk space is low, stop handle request. {}",
  561. self.disable_handle_request
  562. .load(std::sync::atomic::Ordering::Relaxed)
  563. );
  564. } else {
  565. if !self
  566. .disable_handle_request
  567. .load(std::sync::atomic::Ordering::Relaxed)
  568. {
  569. return;
  570. }
  571. self.disable_handle_request
  572. .store(false, std::sync::atomic::Ordering::Relaxed);
  573. dns_log!(
  574. LogLevel::INFO,
  575. "free disk space is enough, start handle request."
  576. );
  577. }
  578. }
  579. async fn data_server_loop(this: Arc<DataServer>) -> Result<(), Box<dyn Error>> {
  580. let mut rx: mpsc::Receiver<()>;
  581. let mut data_rx: mpsc::Receiver<Box<dyn DnsRequest>>;
  582. let batch_mode = *this.recv_in_batch.lock().unwrap();
  583. {
  584. let mut _rx = this.notify_rx.lock().unwrap();
  585. rx = _rx.take().unwrap();
  586. let mut _rx = this.data_rx.lock().unwrap();
  587. data_rx = _rx.take().unwrap();
  588. }
  589. this.stat.clone().start_worker()?;
  590. let req_list_size = if batch_mode { 1024 * 32 } else { 1 };
  591. let mut req_list: Vec<Box<dyn DnsRequest>> = Vec::with_capacity(req_list_size);
  592. let batch_size = if batch_mode { 1024 * 8 } else { 1 };
  593. let mut recv_buffer = Vec::with_capacity(batch_size);
  594. let mut batch_timer: Option<tokio::time::Interval> = None;
  595. let mut check_timer = tokio::time::interval(Duration::from_secs(60));
  596. let is_check_timer_running = Arc::new(AtomicBool::new(false));
  597. dns_log!(LogLevel::DEBUG, "data server start.");
  598. loop {
  599. tokio::select! {
  600. _ = rx.recv() => {
  601. break;
  602. }
  603. _ = check_timer.tick() => {
  604. if is_check_timer_running.fetch_xor(true, std::sync::atomic::Ordering::Relaxed) {
  605. continue;
  606. }
  607. let is_check_timer_running_clone = is_check_timer_running.clone();
  608. let this_clone = this.clone();
  609. let ret = DataServer::call_blocking(this.clone(), move || {
  610. this_clone.server_check();
  611. is_check_timer_running_clone.store(false, std::sync::atomic::Ordering::Relaxed);
  612. }).await;
  613. if let Err(e) = ret {
  614. dns_log!(LogLevel::WARN, "data server check error: {}", e);
  615. }
  616. }
  617. _ = async {
  618. if let Some(ref mut timer) = batch_timer {
  619. timer.tick().await;
  620. }
  621. }, if batch_timer.is_some() => {
  622. batch_timer = None;
  623. DataServer::data_server_handle_dns_request(this.clone(), &req_list).await;
  624. req_list.clear();
  625. }
  626. count = data_rx.recv_many(&mut recv_buffer, batch_size) => {
  627. if count <= 0 {
  628. continue;
  629. }
  630. req_list.extend(recv_buffer.drain(0..count));
  631. if batch_mode {
  632. if req_list.len() >= 1 && batch_timer.is_none() {
  633. let fill = (req_list.len() as f32 / batch_size as f32)
  634. .max(0.0)
  635. .min(1.0);
  636. let delay_ms = (1000.0 - 990.0 * fill) as u64;
  637. batch_timer = Some(tokio::time::interval_at(
  638. Instant::now() + Duration::from_millis(delay_ms),
  639. Duration::from_secs(2),
  640. ));
  641. }
  642. if req_list.len() < batch_size {
  643. continue;
  644. }
  645. }
  646. batch_timer = None;
  647. DataServer::data_server_handle_dns_request(this.clone(), &req_list).await;
  648. req_list.clear();
  649. }
  650. }
  651. }
  652. this.stat.clone().stop_worker();
  653. Ok(())
  654. }
  655. fn stop_data_server(&self) {
  656. if let Some(tx) = self.notify_tx.as_ref().cloned() {
  657. let plugin = match self.get_plugin() {
  658. Ok(plugin) => plugin,
  659. Err(e) => {
  660. dns_log!(LogLevel::ERROR, "get plugin error: {}", e);
  661. return;
  662. }
  663. };
  664. let rt = plugin.get_runtime();
  665. tokio::task::block_in_place(|| {
  666. let _ = rt.block_on(async {
  667. let _ = tx.send(()).await;
  668. });
  669. });
  670. }
  671. }
  672. async fn call_blocking<F, R>(
  673. this: Arc<DataServer>,
  674. func: F,
  675. ) -> Result<R, Box<dyn std::error::Error + Send>>
  676. where
  677. F: FnOnce() -> R + Send + 'static,
  678. R: Send + 'static,
  679. {
  680. let rt = this.get_plugin().unwrap().get_runtime();
  681. let ret = rt.spawn_blocking(move || -> R {
  682. return func();
  683. });
  684. let ret = ret.await;
  685. if ret.is_err() {
  686. return Err(Box::new(ret.err().unwrap()));
  687. }
  688. let ret = ret.unwrap();
  689. return Ok(ret);
  690. }
  691. }