db.rs 52 KB


  1. /*************************************************************************
  2. *
  3. * Copyright (C) 2018-2025 Ruilin Peng (Nick) <[email protected]>.
  4. *
  5. * smartdns is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * smartdns is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. */
  18. use crate::dns_log;
  19. use crate::smartdns;
  20. use crate::smartdns::*;
  21. use crate::utils;
  22. use std::collections::HashMap;
  23. use std::error::Error;
  24. use std::fs;
  25. use std::sync::Mutex;
  26. use std::vec;
  27. use chrono::Local;
  28. use rusqlite::Transaction;
  29. use rusqlite::{Connection, OpenFlags, Result};
  30. pub struct DB {
  31. conn: Mutex<Option<Connection>>,
  32. version: i32,
  33. query_plan: bool,
  34. }
  35. #[derive(Debug, Clone)]
  36. pub struct ClientData {
  37. pub id: u32,
  38. pub hostname: String,
  39. pub client_ip: String,
  40. pub mac: String,
  41. pub last_query_timestamp: u64,
  42. }
  43. #[derive(Debug, Clone)]
  44. pub struct ClientQueryCount {
  45. pub client_ip: String,
  46. pub count: u32,
  47. pub timestamp_start: u64,
  48. pub timestamp_end: u64,
  49. }
  50. #[derive(Debug, Clone)]
  51. pub struct DomainQueryCount {
  52. pub domain: String,
  53. pub count: u32,
  54. pub timestamp_start: u64,
  55. pub timestamp_end: u64,
  56. }
  57. #[derive(Debug, Clone)]
  58. pub struct HourlyQueryCountItem {
  59. pub hour: String,
  60. pub query_count: u32,
  61. }
  62. #[derive(Debug, Clone)]
  63. pub struct HourlyQueryCount {
  64. pub query_timestamp: u64,
  65. pub hourly_query_count: Vec<HourlyQueryCountItem>,
  66. }
  67. #[derive(Debug, Clone)]
  68. pub struct DailyQueryCountItem {
  69. pub day: String,
  70. pub query_count: u32,
  71. }
  72. #[derive(Debug, Clone)]
  73. pub struct DailyQueryCount {
  74. pub query_timestamp: u64,
  75. pub daily_query_count: Vec<DailyQueryCountItem>,
  76. }
  77. #[derive(Debug, Clone)]
  78. pub struct DomainData {
  79. pub id: u64,
  80. pub timestamp: u64,
  81. pub domain: String,
  82. pub domain_type: u32,
  83. pub client: String,
  84. pub domain_group: String,
  85. pub reply_code: u16,
  86. pub query_time: i32,
  87. pub ping_time: f64,
  88. pub is_blocked: bool,
  89. pub is_cached: bool,
  90. }
  91. #[derive(Debug, Clone)]
  92. pub struct QueryDomainListResult {
  93. pub domain_list: Vec<DomainData>,
  94. pub total_count: u64,
  95. pub step_by_cursor: bool,
  96. }
  97. #[derive(Debug, Clone)]
  98. pub struct DomainListGetParamCursor {
  99. pub id: Option<u64>,
  100. pub total_count: u64,
  101. pub direction: String,
  102. }
  103. #[derive(Debug, Clone)]
  104. pub struct QueryClientListResult {
  105. pub client_list: Vec<ClientData>,
  106. pub total_count: u64,
  107. pub step_by_cursor: bool,
  108. }
  109. #[derive(Debug, Clone)]
  110. pub struct ClientListGetParamCursor {
  111. pub id: Option<u64>,
  112. pub total_count: u64,
  113. pub direction: String,
  114. }
  115. #[derive(Debug, Clone)]
  116. pub struct ClientListGetParam {
  117. pub id: Option<u64>,
  118. pub order: Option<String>,
  119. pub page_num: u64,
  120. pub page_size: u64,
  121. pub client_ip: Option<String>,
  122. pub mac: Option<String>,
  123. pub hostname: Option<String>,
  124. pub timestamp_before: Option<u64>,
  125. pub timestamp_after: Option<u64>,
  126. pub cursor: Option<ClientListGetParamCursor>,
  127. }
  128. impl ClientListGetParam {
  129. pub fn new() -> Self {
  130. ClientListGetParam {
  131. id: None,
  132. page_num: 1,
  133. order: None,
  134. page_size: 10,
  135. client_ip: None,
  136. mac: None,
  137. hostname: None,
  138. timestamp_before: None,
  139. timestamp_after: None,
  140. cursor: None,
  141. }
  142. }
  143. }
  144. #[derive(Debug, Clone)]
  145. pub struct DomainListGetParam {
  146. pub id: Option<u64>,
  147. pub order: Option<String>,
  148. pub page_num: u64,
  149. pub page_size: u64,
  150. pub domain: Option<String>,
  151. pub domain_filter_mode: Option<String>,
  152. pub domain_type: Option<u32>,
  153. pub client: Option<String>,
  154. pub domain_group: Option<String>,
  155. pub reply_code: Option<u16>,
  156. pub timestamp_before: Option<u64>,
  157. pub timestamp_after: Option<u64>,
  158. pub is_blocked: Option<bool>,
  159. pub is_cached: Option<bool>,
  160. pub cursor: Option<DomainListGetParamCursor>,
  161. }
  162. impl DomainListGetParam {
  163. pub fn new() -> Self {
  164. DomainListGetParam {
  165. id: None,
  166. page_num: 1,
  167. order: None,
  168. page_size: 10,
  169. domain: None,
  170. domain_filter_mode: None,
  171. domain_type: None,
  172. client: None,
  173. domain_group: None,
  174. reply_code: None,
  175. timestamp_before: None,
  176. timestamp_after: None,
  177. is_blocked: None,
  178. is_cached: None,
  179. cursor: None,
  180. }
  181. }
  182. }
  183. impl DB {
  184. pub fn new() -> Self {
  185. DB {
  186. conn: Mutex::new(None),
  187. version: 10000, /* x: major version, xx: minor version, xx: patch version */
  188. query_plan: std::env::var("SMARTDNS_DEBUG_SQL").is_ok(),
  189. }
  190. }
  191. fn create_table(&self, conn: &Connection) -> Result<()> {
  192. conn.execute(
  193. "CREATE TABLE IF NOT EXISTS domain (
  194. id INTEGER PRIMARY KEY AUTOINCREMENT,
  195. timestamp BIGINT NOT NULL,
  196. domain TEXT NOT NULL,
  197. domain_type INTEGER NOT NULL,
  198. client TEXT NOT NULL,
  199. domain_group TEXT NOT NULL,
  200. reply_code INTEGER NOT NULL,
  201. query_time INTEGER NOT NULL,
  202. ping_time REAL NOT NULL,
  203. is_blocked INTEGER DEFAULT 0,
  204. is_cached INTEGER DEFAULT 0
  205. )",
  206. [],
  207. )?;
  208. conn.execute(
  209. "CREATE INDEX IF NOT EXISTS idx_domain_timestamp ON domain (timestamp)",
  210. [],
  211. )?;
  212. conn.execute(
  213. "CREATE INDEX IF NOT EXISTS idx_domain_client ON domain (client)",
  214. [],
  215. )?;
  216. conn.execute(
  217. "CREATE TABLE IF NOT EXISTS domain_hourly_count (
  218. timestamp BIGINT PRIMARY KEY,
  219. count INTEGER DEFAULT 0
  220. );",
  221. [],
  222. )?;
  223. conn.execute(
  224. "CREATE TABLE IF NOT EXISTS domain_daily_count (
  225. timestamp BIGINT PRIMARY KEY,
  226. count INTEGER DEFAULT 0
  227. );",
  228. [],
  229. )?;
  230. conn.execute(
  231. "CREATE TABLE IF NOT EXISTS top_domain_list (
  232. domain TEXT PRIMARY KEY,
  233. count INTEGER DEFAULT 0,
  234. timestamp_start BIGINT DEFAULT 0,
  235. timestamp_end BIGINT DEFAULT 0
  236. );",
  237. [],
  238. )?;
  239. conn.execute(
  240. "CREATE TABLE IF NOT EXISTS top_client_list (
  241. client TEXT PRIMARY KEY,
  242. count INTEGER DEFAULT 0,
  243. timestamp_start BIGINT DEFAULT 0,
  244. timestamp_end BIGINT DEFAULT 0
  245. );",
  246. [],
  247. )?;
  248. conn.execute(
  249. "
  250. CREATE TABLE IF NOT EXISTS client (
  251. id INTEGER PRIMARY KEY,
  252. client_ip TEXT NOT NULL,
  253. mac TEXT NOT NULL,
  254. hostname TEXT NOT NULL,
  255. last_query_timestamp BIGINT NOT NULL,
  256. UNIQUE(client_ip, mac)
  257. )",
  258. [],
  259. )?;
  260. conn.execute(
  261. "CREATE INDEX IF NOT EXISTS idx_client_last_query_timestamp ON client (last_query_timestamp)",
  262. [],
  263. )?;
  264. conn.execute(
  265. "CREATE TABLE IF NOT EXISTS config (
  266. key TEXT PRIMARY KEY,
  267. value TEXT NOT NULL
  268. )",
  269. [],
  270. )?;
  271. conn.execute(
  272. "CREATE TABLE IF NOT EXISTS status_data (
  273. key TEXT PRIMARY KEY,
  274. value TEXT NOT NULL
  275. )",
  276. [],
  277. )?;
  278. conn.execute(
  279. "INSERT INTO schema_version (version) VALUES (?)",
  280. [self.version],
  281. )?;
  282. Ok(())
  283. }
  284. fn migrate_db(&self, _conn: &Connection) -> Result<(), Box<dyn Error>> {
  285. return Err(
  286. "Currently Not Support Migrate Database, Please Backup DB File, And Restart Server."
  287. .into(),
  288. );
  289. }
  290. fn init_db(&self, conn: &Connection) -> Result<(), Box<dyn Error>> {
  291. conn.execute(
  292. "CREATE TABLE IF NOT EXISTS schema_version (
  293. version INTEGER PRIMARY KEY
  294. )",
  295. [],
  296. )?;
  297. let current_version: i32 = conn
  298. .query_row(
  299. "SELECT version FROM schema_version ORDER BY version DESC LIMIT 1",
  300. [],
  301. |row| row.get(0),
  302. )
  303. .unwrap_or(self.version);
  304. if current_version >= self.version {
  305. self.create_table(conn)?;
  306. } else {
  307. self.migrate_db(conn)?;
  308. }
  309. Ok(())
  310. }
  311. pub fn open(&self, path: &str) -> Result<(), Box<dyn Error>> {
  312. let ruconn: std::result::Result<Connection, rusqlite::Error> =
  313. Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_WRITE);
  314. let mut conn = self.conn.lock().unwrap();
  315. if let Err(_) = ruconn {
  316. let ruconn = Connection::open_with_flags(
  317. path,
  318. OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
  319. )?;
  320. let ret = self.init_db(&ruconn);
  321. if let Err(e) = ret {
  322. _ = ruconn.close();
  323. fs::remove_file(path)?;
  324. return Err(e);
  325. }
  326. *conn = Some(ruconn);
  327. } else {
  328. *conn = Some(ruconn.unwrap());
  329. }
  330. conn.as_ref()
  331. .unwrap()
  332. .execute("PRAGMA synchronous = OFF", [])?;
  333. conn.as_ref()
  334. .unwrap()
  335. .execute("PRAGMA page_size = 4096", [])?;
  336. conn.as_ref()
  337. .unwrap()
  338. .execute("PRAGMA cache_size = 10000", [])?;
  339. conn.as_ref()
  340. .unwrap()
  341. .execute("PRAGMA temp_store = MEMORY", [])?;
  342. conn.as_ref()
  343. .unwrap()
  344. .query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
  345. Ok(())
  346. }
  347. pub fn set_config(&self, key: &str, value: &str) -> Result<(), Box<dyn Error>> {
  348. let conn = self.conn.lock().unwrap();
  349. if conn.as_ref().is_none() {
  350. return Err("db is not open".into());
  351. }
  352. let conn = conn.as_ref().unwrap();
  353. let mut stmt =
  354. conn.prepare("INSERT OR REPLACE INTO config (key, value) VALUES (?1, ?2)")?;
  355. let ret = stmt.execute(&[&key, &value]);
  356. if let Err(e) = ret {
  357. return Err(Box::new(e));
  358. }
  359. Ok(())
  360. }
  361. pub fn get_config_list(&self) -> Result<HashMap<String, String>, Box<dyn Error>> {
  362. let mut ret = HashMap::new();
  363. let conn = self.conn.lock().unwrap();
  364. if conn.as_ref().is_none() {
  365. return Err("db is not open".into());
  366. }
  367. let conn = conn.as_ref().unwrap();
  368. let mut stmt = conn.prepare("SELECT key, value FROM config").unwrap();
  369. let rows = stmt.query_map([], |row| {
  370. let key: String = row.get(0)?;
  371. let value: String = row.get(1)?;
  372. Ok((key, value))
  373. });
  374. if let Ok(rows) = rows {
  375. for row in rows {
  376. if let Ok(row) = row {
  377. ret.insert(row.0, row.1);
  378. }
  379. }
  380. }
  381. Ok(ret)
  382. }
  383. pub fn set_status_data(&self, key: &str, value: &str) -> Result<(), Box<dyn Error>> {
  384. let conn = self.conn.lock().unwrap();
  385. if conn.as_ref().is_none() {
  386. return Err("db is not open".into());
  387. }
  388. let conn = conn.as_ref().unwrap();
  389. let mut stmt =
  390. conn.prepare("INSERT OR REPLACE INTO status_data (key, value) VALUES (?1, ?2)")?;
  391. let ret = stmt.execute(&[&key, &value]);
  392. if let Err(e) = ret {
  393. return Err(Box::new(e));
  394. }
  395. Ok(())
  396. }
  397. pub fn get_status_data_list(&self) -> Result<HashMap<String, String>, Box<dyn Error>> {
  398. let mut ret = HashMap::new();
  399. let conn = self.conn.lock().unwrap();
  400. if conn.as_ref().is_none() {
  401. return Err("db is not open".into());
  402. }
  403. let conn = conn.as_ref().unwrap();
  404. let stmt = conn.prepare("SELECT key, value FROM status_data");
  405. if let Err(e) = stmt {
  406. return Err(Box::new(e));
  407. }
  408. let mut stmt = stmt.unwrap();
  409. let rows = stmt.query_map([], |row| {
  410. let key: String = row.get(0)?;
  411. let value: String = row.get(1)?;
  412. Ok((key, value))
  413. });
  414. if let Ok(rows) = rows {
  415. for row in rows {
  416. if let Ok(row) = row {
  417. ret.insert(row.0, row.1);
  418. }
  419. }
  420. }
  421. Ok(ret)
  422. }
  423. pub fn debug_query_plan(&self, conn: &Connection, sql: String, sql_param: &Vec<String>) {
  424. if !self.query_plan {
  425. return;
  426. }
  427. let sqlplan = "EXPLAIN QUERY PLAN ".to_string() + &sql;
  428. let stmt = conn.prepare(sqlplan.as_str());
  429. if let Err(e) = stmt {
  430. dns_log!(LogLevel::DEBUG, "query plan sql error: {}", e);
  431. return;
  432. }
  433. let mut stmt = stmt.unwrap();
  434. let plan_rows = stmt.query_map(rusqlite::params_from_iter(sql_param.clone()), |row| {
  435. Ok(row.get::<_, String>(3)?)
  436. });
  437. if let Err(e) = plan_rows {
  438. dns_log!(LogLevel::DEBUG, "query plan error: {}", e);
  439. return;
  440. }
  441. let plan_rows = plan_rows.unwrap();
  442. dns_log!(LogLevel::NOTICE, "sql: {}", sql);
  443. for plan in plan_rows {
  444. if let Ok(plan) = plan {
  445. dns_log!(LogLevel::NOTICE, "plan: {}", plan);
  446. }
  447. }
  448. }
  449. pub fn get_config(&self, key: &str) -> Result<Option<String>, Box<dyn Error>> {
  450. let conn = self.conn.lock().unwrap();
  451. if conn.as_ref().is_none() {
  452. return Err("db is not open".into());
  453. }
  454. let conn = conn.as_ref().unwrap();
  455. let mut stmt = conn
  456. .prepare("SELECT value FROM config WHERE key = ?")
  457. .unwrap();
  458. let rows = stmt.query_map(&[&key], |row| Ok(row.get(0)?));
  459. if let Ok(rows) = rows {
  460. for row in rows {
  461. if let Ok(row) = row {
  462. return Ok(Some(row));
  463. }
  464. }
  465. }
  466. Ok(None)
  467. }
  468. pub fn update_domain_hourly_count(
  469. &self,
  470. tx: &Transaction<'_>,
  471. hourly_count: &HashMap<u64, u32>,
  472. ) -> Result<(), Box<dyn Error>> {
  473. let mut stmt = tx.prepare(
  474. "INSERT INTO domain_hourly_count (timestamp, count)
  475. VALUES (
  476. ?1,
  477. ?2
  478. )
  479. ON CONFLICT(timestamp) DO UPDATE SET count = count + ?2;",
  480. )?;
  481. for (k, v) in hourly_count {
  482. stmt.execute(rusqlite::params![k, v])?;
  483. }
  484. stmt.finalize()?;
  485. Ok(())
  486. }
  487. pub fn update_domain_daily_count(
  488. &self,
  489. tx: &Transaction<'_>,
  490. daily_count: &HashMap<u64, u32>,
  491. ) -> Result<(), Box<dyn Error>> {
  492. let mut stmt = tx.prepare(
  493. "INSERT INTO domain_daily_count (timestamp, count)
  494. VALUES (
  495. ?1,
  496. ?2
  497. )
  498. ON CONFLICT(timestamp) DO UPDATE SET count = count + ?2;",
  499. )?;
  500. for (k, v) in daily_count {
  501. stmt.execute(rusqlite::params![k, v])?;
  502. }
  503. stmt.finalize()?;
  504. Ok(())
  505. }
  506. pub fn insert_domain(&self, data: &Vec<DomainData>) -> Result<(), Box<dyn Error>> {
  507. let local_offset = Local::now().offset().local_minus_utc();
  508. let mut conn = self.conn.lock().unwrap();
  509. if conn.as_ref().is_none() {
  510. return Err("db is not open".into());
  511. }
  512. let mut hourly_count = HashMap::new();
  513. let mut daily_count = HashMap::new();
  514. let conn = conn.as_mut().unwrap();
  515. let tx = conn.transaction()?;
  516. let mut stmt = tx.prepare(
  517. "INSERT INTO domain \
  518. (timestamp, domain, domain_type, client, domain_group, reply_code, query_time, ping_time, is_blocked, is_cached) \
  519. VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)")?;
  520. for d in data {
  521. let ret = stmt.execute(rusqlite::params![
  522. &d.timestamp.to_string(),
  523. &d.domain,
  524. &d.domain_type.to_string(),
  525. &d.client,
  526. &d.domain_group,
  527. &d.reply_code,
  528. &d.query_time,
  529. &d.ping_time,
  530. &(d.is_blocked as i32),
  531. &(d.is_cached as i32)
  532. ]);
  533. if let Err(e) = ret {
  534. stmt.finalize()?;
  535. tx.rollback()?;
  536. return Err(Box::new(e));
  537. }
  538. let localtimestamp = d.timestamp + local_offset as u64 * 1000;
  539. let hour_timestamp =
  540. localtimestamp - localtimestamp % 3600000 - local_offset as u64 * 1000;
  541. let day_timestamp =
  542. localtimestamp - localtimestamp % 86400000 - local_offset as u64 * 1000;
  543. hourly_count
  544. .entry(hour_timestamp)
  545. .and_modify(|v| *v += 1)
  546. .or_insert(1);
  547. daily_count
  548. .entry(day_timestamp)
  549. .and_modify(|v| *v += 1)
  550. .or_insert(1);
  551. }
  552. stmt.finalize()?;
  553. self.update_domain_hourly_count(&tx, &hourly_count)?;
  554. self.update_domain_daily_count(&tx, &daily_count)?;
  555. tx.commit()?;
  556. Ok(())
  557. }
  558. pub fn get_db_file_path(&self) -> Option<String> {
  559. let conn = self.conn.lock().unwrap();
  560. if conn.is_none() {
  561. return None;
  562. }
  563. let conn = conn.as_ref().unwrap();
  564. conn.path().map(|v| v.to_string())
  565. }
  566. pub fn get_readonly_conn(&self) -> Option<Connection> {
  567. let conn = self.conn.lock().unwrap();
  568. if conn.is_none() {
  569. return None;
  570. }
  571. let conn = conn.as_ref().unwrap();
  572. let read_conn = Connection::open_with_flags(
  573. conn.path().unwrap(),
  574. OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
  575. );
  576. if let Err(_) = read_conn {
  577. return None;
  578. }
  579. Some(read_conn.unwrap())
  580. }
  581. /// # Returns
  582. ///
  583. /// A tuple containing:
  584. /// - `String`: The SQL WHERE clause.
  585. /// - `String`: The SQL ORDER BY clause.
  586. /// - `Vec<String>`: The parameters for the SQL query.
  587. pub fn get_domain_sql_where(
  588. param: Option<&DomainListGetParam>,
  589. ) -> Result<(String, String, Vec<String>), Box<dyn Error>> {
  590. let mut is_desc_order = true;
  591. let mut is_cursor_prev = false;
  592. let param = match param {
  593. Some(v) => v,
  594. None => return Ok((String::new(), String::new(), Vec::new())),
  595. };
  596. let mut order_timestamp_first = true;
  597. let mut cusor_with_timestamp = false;
  598. let mut sql_where = Vec::new();
  599. let mut sql_param: Vec<String> = Vec::new();
  600. let mut sql_order = String::new();
  601. if let Some(v) = &param.id {
  602. sql_where.push("id = ?".to_string());
  603. sql_param.push(v.to_string());
  604. order_timestamp_first = false;
  605. }
  606. if let Some(v) = &param.order {
  607. if v.eq_ignore_ascii_case("asc") {
  608. is_cursor_prev = true;
  609. } else if v.eq_ignore_ascii_case("desc") {
  610. is_cursor_prev = false;
  611. } else {
  612. return Err("order param error".into());
  613. }
  614. }
  615. if let Some(v) = &param.cursor {
  616. if v.direction.eq_ignore_ascii_case("prev") {
  617. is_desc_order = !is_desc_order;
  618. } else if v.direction.eq_ignore_ascii_case("next") {
  619. is_desc_order = is_desc_order;
  620. } else {
  621. return Err("cursor direction param error".into());
  622. }
  623. }
  624. if let Some(v) = &param.domain {
  625. if let Some(m) = &param.domain_filter_mode {
  626. match m.as_str() {
  627. "endwith" => {
  628. sql_where.push("domain LIKE ?".to_string());
  629. sql_param.push(format!("{}%", v));
  630. }
  631. "startwith" => {
  632. sql_where.push("domain LIKE ?".to_string());
  633. sql_param.push(format!("%{}", v));
  634. }
  635. "contains" => {
  636. sql_where.push("domain LIKE ?".to_string());
  637. sql_param.push(format!("%{}%", v));
  638. }
  639. "equals" => {
  640. sql_where.push("domain = ?".to_string());
  641. sql_param.push(v.to_string());
  642. }
  643. _ => return Err("domain_filter_mode param error".into()),
  644. }
  645. } else {
  646. sql_where.push("domain = ?".to_string());
  647. sql_param.push(v.to_string());
  648. order_timestamp_first = false;
  649. }
  650. }
  651. if let Some(v) = &param.domain_type {
  652. sql_where.push("domain_type = ?".to_string());
  653. sql_param.push(v.to_string());
  654. order_timestamp_first = false;
  655. }
  656. if let Some(v) = &param.client {
  657. sql_where.push("client = ?".to_string());
  658. sql_param.push(v.clone());
  659. order_timestamp_first = false;
  660. }
  661. if let Some(v) = &param.domain_group {
  662. sql_where.push("domain_group = ?".to_string());
  663. sql_param.push(v.clone());
  664. order_timestamp_first = false;
  665. }
  666. if let Some(v) = &param.reply_code {
  667. sql_where.push("reply_code = ?".to_string());
  668. sql_param.push(v.to_string());
  669. order_timestamp_first = false;
  670. }
  671. if let Some(v) = &param.timestamp_before {
  672. let mut use_cursor = false;
  673. if param.cursor.is_some() && (is_desc_order || is_cursor_prev) {
  674. let v = param.cursor.as_ref().unwrap().id;
  675. if let Some(v) = v {
  676. sql_where.push("id < ?".to_string());
  677. sql_param.push(v.to_string());
  678. use_cursor = true;
  679. order_timestamp_first = false;
  680. cusor_with_timestamp = true;
  681. }
  682. }
  683. if use_cursor == false {
  684. sql_where.push("timestamp <= ?".to_string());
  685. sql_param.push(v.to_string());
  686. }
  687. }
  688. if let Some(v) = &param.timestamp_after {
  689. let mut use_cursor = false;
  690. if param.cursor.is_some() && (!is_desc_order || is_cursor_prev) {
  691. let v = param.cursor.as_ref().unwrap().id;
  692. if let Some(v) = v {
  693. sql_where.push("id > ?".to_string());
  694. sql_param.push(v.to_string());
  695. use_cursor = true;
  696. order_timestamp_first = false;
  697. cusor_with_timestamp = true;
  698. }
  699. }
  700. if use_cursor == false {
  701. sql_where.push("timestamp >= ?".to_string());
  702. sql_param.push(v.to_string());
  703. }
  704. }
  705. if !cusor_with_timestamp {
  706. if let Some(v) = &param.cursor {
  707. if is_cursor_prev {
  708. if let Some(id) = &v.id {
  709. if is_desc_order {
  710. sql_where.push("id > ?".to_string());
  711. } else {
  712. sql_where.push("id < ?".to_string());
  713. }
  714. sql_param.push(id.to_string());
  715. order_timestamp_first = false;
  716. }
  717. } else {
  718. if let Some(id) = &v.id {
  719. if is_desc_order {
  720. sql_where.push("id < ?".to_string());
  721. } else {
  722. sql_where.push("id > ?".to_string());
  723. }
  724. sql_param.push(id.to_string());
  725. order_timestamp_first = false;
  726. }
  727. }
  728. }
  729. }
  730. if let Some(v) = &param.is_blocked {
  731. if *v {
  732. sql_where.push("is_blocked = 1".to_string());
  733. } else {
  734. sql_where.push("is_blocked = 0".to_string());
  735. }
  736. order_timestamp_first = false;
  737. }
  738. if let Some(v) = &param.is_cached {
  739. if *v {
  740. sql_where.push("is_cached = 1".to_string());
  741. } else {
  742. sql_where.push("is_cached = 0".to_string());
  743. }
  744. order_timestamp_first = false;
  745. }
  746. if is_cursor_prev {
  747. is_desc_order = !is_desc_order;
  748. }
  749. if is_desc_order {
  750. if order_timestamp_first {
  751. sql_order.push_str(" ORDER BY timestamp DESC, id DESC");
  752. } else {
  753. sql_order.push_str(" ORDER BY id DESC, timestamp DESC");
  754. }
  755. } else {
  756. if order_timestamp_first {
  757. sql_order.push_str(" ORDER BY timestamp ASC, id ASC");
  758. } else {
  759. sql_order.push_str(" ORDER BY id ASC, timestamp ASC");
  760. }
  761. }
  762. let sql_where = if sql_where.is_empty() {
  763. String::new()
  764. } else {
  765. format!(" WHERE {}", sql_where.join(" AND "))
  766. };
  767. Ok((sql_where, sql_order, sql_param))
  768. }
  769. pub fn get_domain_list_count(&self, param: Option<&DomainListGetParam>) -> u64 {
  770. let conn = self.get_readonly_conn();
  771. if conn.as_ref().is_none() {
  772. return 0;
  773. }
  774. let conn = conn.as_ref().unwrap();
  775. let mut sql = String::new();
  776. let mut sql_param = Vec::new();
  777. sql.push_str("SELECT COUNT(*) FROM domain");
  778. if let Ok((sql_where, sql_order, mut ret_sql_param)) = Self::get_domain_sql_where(param) {
  779. sql.push_str(sql_where.as_str());
  780. sql.push_str(sql_order.as_str());
  781. sql_param.append(&mut ret_sql_param);
  782. }
  783. let mut stmt = conn.prepare(sql.as_str()).unwrap();
  784. let rows = stmt.query_map(rusqlite::params_from_iter(sql_param), |row| Ok(row.get(0)?));
  785. if let Ok(rows) = rows {
  786. for row in rows {
  787. if let Ok(row) = row {
  788. return row;
  789. }
  790. }
  791. }
  792. 0
  793. }
  794. pub fn delete_domain_by_id(&self, id: u64) -> Result<u64, Box<dyn Error>> {
  795. let conn = self.conn.lock().unwrap();
  796. if conn.as_ref().is_none() {
  797. return Err("db is not open".into());
  798. }
  799. let conn = conn.as_ref().unwrap();
  800. let ret = conn.execute("DELETE FROM domain WHERE id = ?", &[&id]);
  801. if let Err(e) = ret {
  802. return Err(Box::new(e));
  803. }
  804. Ok(ret.unwrap() as u64)
  805. }
  806. pub fn delete_domain_before_timestamp(&self, timestamp: u64) -> Result<u64, Box<dyn Error>> {
  807. let conn = self.conn.lock().unwrap();
  808. if conn.as_ref().is_none() {
  809. return Err("db is not open".into());
  810. }
  811. let conn = conn.as_ref().unwrap();
  812. let ret = conn.execute("DELETE FROM domain WHERE timestamp <= ?", &[&timestamp]);
  813. if let Err(e) = ret {
  814. return Err(Box::new(e));
  815. }
  816. Ok(ret.unwrap() as u64)
  817. }
  818. pub fn refresh_client_top_list(&self, timestamp: u64) -> Result<(), Box<dyn Error>> {
  819. let mut client_count_list = Vec::new();
  820. let conn = match self.get_readonly_conn() {
  821. Some(v) => v,
  822. None => return Err("db is not open".into()),
  823. };
  824. let timestamp_now = smartdns::get_utc_time_ms();
  825. let sql = "SELECT client, COUNT(*) FROM domain WHERE timestamp >= ? GROUP BY client ORDER BY COUNT(*) DESC LIMIT 20";
  826. self.debug_query_plan(&conn, sql.to_string(), &vec![timestamp.to_string()]);
  827. let mut stmt = conn.prepare(sql)?;
  828. let rows = stmt.query_map([timestamp.to_string()], |row| {
  829. Ok(ClientQueryCount {
  830. client_ip: row.get(0)?,
  831. count: row.get(1)?,
  832. timestamp_start: timestamp,
  833. timestamp_end: timestamp_now,
  834. })
  835. });
  836. if let Ok(rows) = rows {
  837. for row in rows {
  838. if let Ok(row) = row {
  839. client_count_list.push(row);
  840. }
  841. }
  842. }
  843. let mut conn = self.conn.lock().unwrap();
  844. if conn.as_ref().is_none() {
  845. return Err("db is not open".into());
  846. }
  847. let conn = conn.as_mut().unwrap();
  848. let tx = conn.transaction()?;
  849. let mut stmt = tx.prepare("DELETE FROM top_client_list")?;
  850. stmt.execute([])?;
  851. stmt.finalize()?;
  852. let mut stmt =
  853. tx.prepare("INSERT INTO top_client_list (client, count, timestamp_start, timestamp_end) VALUES ( ?1, ?2, $3, $4)")?;
  854. for client in &client_count_list {
  855. stmt.execute(rusqlite::params![
  856. client.client_ip,
  857. client.count,
  858. client.timestamp_start,
  859. client.timestamp_end
  860. ])?;
  861. dns_log!(
  862. LogLevel::DEBUG,
  863. "client: {}, count: {}, timestamp_start: {}, timestamp_end: {}",
  864. client.client_ip,
  865. client.count,
  866. client.timestamp_start,
  867. client.timestamp_end
  868. );
  869. }
  870. stmt.finalize()?;
  871. tx.commit()?;
  872. Ok(())
  873. }
  874. pub fn get_client_top_list(&self, count: u32) -> Result<Vec<ClientQueryCount>, Box<dyn Error>> {
  875. let mut ret = Vec::new();
  876. let conn = self.get_readonly_conn();
  877. if conn.as_ref().is_none() {
  878. return Err("db is not open".into());
  879. }
  880. let conn = conn.as_ref().unwrap();
  881. let mut stmt =
  882. conn.prepare("SELECT client, count, timestamp_start, timestamp_end FROM top_client_list ORDER BY count DESC LIMIT ?")?;
  883. let rows = stmt.query_map([count.to_string()], |row| {
  884. Ok(ClientQueryCount {
  885. client_ip: row.get(0)?,
  886. count: row.get(1)?,
  887. timestamp_start: row.get(2)?,
  888. timestamp_end: row.get(3)?,
  889. })
  890. });
  891. if let Ok(rows) = rows {
  892. for row in rows {
  893. if let Ok(row) = row {
  894. ret.push(row);
  895. }
  896. }
  897. }
  898. Ok(ret)
  899. }
  900. pub fn delete_daily_query_count_before_timestamp(
  901. &self,
  902. timestamp: u64,
  903. ) -> Result<u64, Box<dyn Error>> {
  904. let conn = self.conn.lock().unwrap();
  905. if conn.as_ref().is_none() {
  906. return Err("db is not open".into());
  907. }
  908. let conn = conn.as_ref().unwrap();
  909. let ret = conn.execute(
  910. "DELETE FROM domain_daily_count WHERE timestamp <= ?",
  911. &[&timestamp],
  912. );
  913. if let Err(e) = ret {
  914. return Err(Box::new(e));
  915. }
  916. Ok(ret.unwrap() as u64)
  917. }
  918. pub fn get_daily_query_count(&self, past_days: u32) -> Result<DailyQueryCount, Box<dyn Error>> {
  919. let mut ret = Vec::new();
  920. let conn = self.get_readonly_conn();
  921. if conn.as_ref().is_none() {
  922. return Err("db is not open".into());
  923. }
  924. let conn = conn.as_ref().unwrap();
  925. let seconds = 86400 * past_days - utils::seconds_until_next_hour() as u32;
  926. let mut stmt = conn.prepare(
  927. "SELECT \
  928. strftime('%Y-%m-%d', datetime(timestamp / 1000, 'unixepoch', 'localtime')) AS date, timestamp, count \
  929. FROM \
  930. domain_daily_count \
  931. WHERE \
  932. timestamp >= strftime('%s', 'now') * 1000 - ? * 1000 \
  933. ORDER BY \
  934. timestamp DESC;\
  935. ",
  936. )?;
  937. let rows = stmt.query_map([seconds.to_string()], |row| {
  938. Ok(DailyQueryCountItem {
  939. day: row.get(0)?,
  940. query_count: row.get(2)?,
  941. })
  942. });
  943. if let Ok(rows) = rows {
  944. for row in rows {
  945. if let Ok(row) = row {
  946. ret.push(row);
  947. }
  948. }
  949. }
  950. Ok(DailyQueryCount {
  951. query_timestamp: smartdns::get_utc_time_ms(),
  952. daily_query_count: ret,
  953. })
  954. }
  955. pub fn delete_hourly_query_count_before_timestamp(
  956. &self,
  957. timestamp: u64,
  958. ) -> Result<u64, Box<dyn Error>> {
  959. let conn = self.conn.lock().unwrap();
  960. if conn.as_ref().is_none() {
  961. return Err("db is not open".into());
  962. }
  963. let conn = conn.as_ref().unwrap();
  964. let ret = conn.execute(
  965. "DELETE FROM domain_hourly_count WHERE timestamp <= ?",
  966. &[&timestamp],
  967. );
  968. if let Err(e) = ret {
  969. return Err(Box::new(e));
  970. }
  971. Ok(ret.unwrap() as u64)
  972. }
  973. pub fn get_hourly_query_count(
  974. &self,
  975. past_hours: u32,
  976. ) -> Result<HourlyQueryCount, Box<dyn Error>> {
  977. let mut ret = Vec::new();
  978. let conn = self.get_readonly_conn();
  979. if conn.as_ref().is_none() {
  980. return Err("db is not open".into());
  981. }
  982. let query_start = std::time::Instant::now();
  983. let conn = conn.as_ref().unwrap();
  984. let seconds = 3600 * past_hours - utils::seconds_until_next_hour() as u32;
  985. let sql = "SELECT \
  986. strftime('%Y-%m-%d %H:00:00', datetime(timestamp / 1000, 'unixepoch', 'localtime')) AS hour, timestamp, count \
  987. FROM \
  988. domain_hourly_count \
  989. WHERE \
  990. timestamp >= strftime('%s', 'now') * 1000 - ? * 1000 \
  991. ORDER BY \
  992. timestamp DESC;\
  993. ";
  994. self.debug_query_plan(conn, sql.to_string(), &vec![seconds.to_string()]);
  995. let mut stmt = conn.prepare(sql)?;
  996. let rows = stmt.query_map([seconds.to_string()], |row| {
  997. Ok(HourlyQueryCountItem {
  998. hour: row.get(0)?,
  999. query_count: row.get(2)?,
  1000. })
  1001. });
  1002. if let Ok(rows) = rows {
  1003. for row in rows {
  1004. if let Ok(row) = row {
  1005. ret.push(row);
  1006. }
  1007. }
  1008. }
  1009. dns_log!(
  1010. LogLevel::DEBUG,
  1011. "hourly_query_count time: {}ms",
  1012. query_start.elapsed().as_millis()
  1013. );
  1014. Ok(HourlyQueryCount {
  1015. query_timestamp: smartdns::get_utc_time_ms(),
  1016. hourly_query_count: ret,
  1017. })
  1018. }
  1019. pub fn refresh_domain_top_list(&self, timestamp: u64) -> Result<(), Box<dyn Error>> {
  1020. let mut domain_count_list = Vec::new();
  1021. let conn = match self.get_readonly_conn() {
  1022. Some(v) => v,
  1023. None => return Err("db is not open".into()),
  1024. };
  1025. let timestamp_now = smartdns::get_utc_time_ms();
  1026. let sql = "SELECT domain, COUNT(*) FROM domain WHERE timestamp >= ? GROUP BY domain ORDER BY COUNT(*) DESC LIMIT 20";
  1027. self.debug_query_plan(&conn, sql.to_string(), &vec![timestamp.to_string()]);
  1028. let mut stmt = conn.prepare(sql)?;
  1029. let rows = stmt.query_map([timestamp.to_string()], |row| {
  1030. Ok(DomainQueryCount {
  1031. domain: row.get(0)?,
  1032. count: row.get(1)?,
  1033. timestamp_start: timestamp,
  1034. timestamp_end: timestamp_now,
  1035. })
  1036. });
  1037. if let Ok(rows) = rows {
  1038. for row in rows {
  1039. if let Ok(row) = row {
  1040. domain_count_list.push(row);
  1041. }
  1042. }
  1043. }
  1044. let mut conn = self.conn.lock().unwrap();
  1045. if conn.as_ref().is_none() {
  1046. return Err("db is not open".into());
  1047. }
  1048. let conn = conn.as_mut().unwrap();
  1049. let tx = conn.transaction()?;
  1050. let mut stmt = tx.prepare("DELETE FROM top_domain_list")?;
  1051. stmt.execute([])?;
  1052. stmt.finalize()?;
  1053. let mut stmt =
  1054. tx.prepare("INSERT INTO top_domain_list (domain, count, timestamp_start, timestamp_end) VALUES ( ?1, ?2, ?3, ?4)")?;
  1055. for domain in &domain_count_list {
  1056. stmt.execute(rusqlite::params![
  1057. domain.domain,
  1058. domain.count,
  1059. domain.timestamp_start,
  1060. domain.timestamp_end
  1061. ])?;
  1062. }
  1063. stmt.finalize()?;
  1064. tx.commit()?;
  1065. Ok(())
  1066. }
  1067. pub fn get_domain_top_list(&self, count: u32) -> Result<Vec<DomainQueryCount>, Box<dyn Error>> {
  1068. let mut ret = Vec::new();
  1069. let conn = self.get_readonly_conn();
  1070. if conn.as_ref().is_none() {
  1071. return Err("db is not open".into());
  1072. }
  1073. let conn = conn.as_ref().unwrap();
  1074. let mut stmt = conn.prepare("SELECT domain, count, timestamp_start, timestamp_end FROM top_domain_list DESC LIMIT ?")?;
  1075. let rows = stmt.query_map([count.to_string()], |row| {
  1076. Ok(DomainQueryCount {
  1077. domain: row.get(0)?,
  1078. count: row.get(1)?,
  1079. timestamp_start: row.get(2)?,
  1080. timestamp_end: row.get(3)?,
  1081. })
  1082. });
  1083. if let Err(e) = rows {
  1084. return Err(Box::new(e));
  1085. }
  1086. if let Ok(rows) = rows {
  1087. for row in rows {
  1088. if let Ok(row) = row {
  1089. ret.push(row);
  1090. }
  1091. }
  1092. }
  1093. Ok(ret)
  1094. }
  1095. pub fn get_domain_list(
  1096. &self,
  1097. param: Option<&DomainListGetParam>,
  1098. ) -> Result<QueryDomainListResult, Box<dyn Error>> {
  1099. let query_start = std::time::Instant::now();
  1100. let mut cursor_reverse = false;
  1101. let mut ret = QueryDomainListResult {
  1102. domain_list: vec![],
  1103. total_count: 0,
  1104. step_by_cursor: false,
  1105. };
  1106. let conn = self.get_readonly_conn();
  1107. if conn.as_ref().is_none() {
  1108. return Err("db is not open".into());
  1109. }
  1110. let conn = conn.as_ref().unwrap();
  1111. let (sql_where, sql_order, mut sql_param) = Self::get_domain_sql_where(param)?;
  1112. let mut sql = String::new();
  1113. sql.push_str("SELECT id, timestamp, domain, domain_type, client, domain_group, reply_code, query_time, ping_time, is_blocked, is_cached FROM domain");
  1114. sql.push_str(sql_where.as_str());
  1115. sql.push_str(sql_order.as_str());
  1116. if let Some(p) = param {
  1117. let mut with_offset = true;
  1118. if let Some(cursor) = &p.cursor {
  1119. if cursor.id.is_some() {
  1120. sql.push_str(" LIMIT ?");
  1121. sql_param.push(p.page_size.to_string());
  1122. with_offset = false;
  1123. }
  1124. if cursor.direction.eq_ignore_ascii_case("prev") {
  1125. cursor_reverse = true;
  1126. }
  1127. }
  1128. if with_offset {
  1129. sql.push_str(" LIMIT ? OFFSET ?");
  1130. sql_param.push(p.page_size.to_string());
  1131. sql_param.push(((p.page_num - 1) * p.page_size).to_string());
  1132. }
  1133. }
  1134. self.debug_query_plan(conn, sql.clone(), &sql_param);
  1135. let stmt = conn.prepare(&sql);
  1136. if let Err(e) = stmt {
  1137. dns_log!(LogLevel::ERROR, "get_domain_list error: {}", e);
  1138. return Err("get_domain_list error".into());
  1139. }
  1140. let mut stmt = stmt?;
  1141. let rows = stmt.query_map(rusqlite::params_from_iter(sql_param), |row| {
  1142. Ok(DomainData {
  1143. id: row.get(0)?,
  1144. timestamp: row.get(1)?,
  1145. domain: row.get(2)?,
  1146. domain_type: row.get(3)?,
  1147. client: row.get(4)?,
  1148. domain_group: row.get(5)?,
  1149. reply_code: row.get(6)?,
  1150. query_time: row.get(7)?,
  1151. ping_time: row.get(8)?,
  1152. is_blocked: row.get(9)?,
  1153. is_cached: row.get(10)?,
  1154. })
  1155. });
  1156. if let Err(e) = rows {
  1157. return Err(Box::new(e));
  1158. }
  1159. if let Ok(rows) = rows {
  1160. for row in rows {
  1161. if let Ok(row) = row {
  1162. ret.domain_list.push(row);
  1163. }
  1164. }
  1165. }
  1166. if cursor_reverse {
  1167. ret.domain_list.reverse();
  1168. }
  1169. if let Some(p) = param {
  1170. if let Some(v) = &p.cursor {
  1171. ret.total_count = v.total_count;
  1172. ret.step_by_cursor = true;
  1173. } else {
  1174. let total_count = self.get_domain_list_count(param);
  1175. ret.total_count = total_count;
  1176. }
  1177. }
  1178. dns_log!(
  1179. LogLevel::DEBUG,
  1180. "domain_list time: {}ms",
  1181. query_start.elapsed().as_millis()
  1182. );
  1183. Ok(ret)
  1184. }
  1185. pub fn insert_client(&self, client_data: &Vec<ClientData>) -> Result<(), Box<dyn Error>> {
  1186. let mut conn = self.conn.lock().unwrap();
  1187. if conn.as_ref().is_none() {
  1188. return Err("db is not open".into());
  1189. }
  1190. let conn = conn.as_mut().unwrap();
  1191. let tx = conn.transaction()?;
  1192. let mut stmt = tx.prepare("INSERT INTO client (id, client_ip, mac, hostname, last_query_timestamp) VALUES (
  1193. (SELECT MAX(rowid) FROM client) + 1,
  1194. ?1, ?2, ?3, ?4)
  1195. ON CONFLICT(client_ip, mac) DO UPDATE SET last_query_timestamp = excluded.last_query_timestamp;
  1196. ")?;
  1197. for d in client_data {
  1198. let ret = stmt.execute(rusqlite::params![
  1199. d.client_ip,
  1200. d.mac,
  1201. d.hostname,
  1202. d.last_query_timestamp
  1203. ]);
  1204. if let Err(e) = ret {
  1205. stmt.finalize()?;
  1206. tx.rollback()?;
  1207. return Err(Box::new(e));
  1208. }
  1209. }
  1210. stmt.finalize()?;
  1211. tx.commit()?;
  1212. Ok(())
  1213. }
  1214. pub fn get_client_list_count(&self, param: Option<&ClientListGetParam>) -> u64 {
  1215. let conn = self.get_readonly_conn();
  1216. if conn.as_ref().is_none() {
  1217. return 0;
  1218. }
  1219. let conn = conn.as_ref().unwrap();
  1220. let mut sql = String::new();
  1221. let mut sql_param = Vec::new();
  1222. sql.push_str("SELECT COUNT(*) FROM client");
  1223. if let Ok((sql_where, sql_order, mut ret_sql_param)) = Self::get_client_sql_where(param) {
  1224. sql.push_str(sql_where.as_str());
  1225. sql.push_str(sql_order.as_str());
  1226. sql_param.append(&mut ret_sql_param);
  1227. }
  1228. let mut stmt = conn.prepare(sql.as_str()).unwrap();
  1229. let rows = stmt.query_map(rusqlite::params_from_iter(sql_param), |row| Ok(row.get(0)?));
  1230. if let Ok(rows) = rows {
  1231. for row in rows {
  1232. if let Ok(row) = row {
  1233. return row;
  1234. }
  1235. }
  1236. }
  1237. 0
  1238. }
  1239. fn get_client_sql_where(
  1240. param: Option<&ClientListGetParam>,
  1241. ) -> Result<(String, String, Vec<String>), Box<dyn Error>> {
  1242. let mut is_desc_order = true;
  1243. let mut is_cursor_prev = false;
  1244. let param = match param {
  1245. Some(v) => v,
  1246. None => return Ok((String::new(), String::new(), Vec::new())),
  1247. };
  1248. let mut order_timestamp_first = false;
  1249. let mut cusor_with_timestamp = false;
  1250. let mut sql_where = Vec::new();
  1251. let mut sql_param: Vec<String> = Vec::new();
  1252. let mut sql_order = String::new();
  1253. if let Some(v) = &param.id {
  1254. sql_where.push("id = ?".to_string());
  1255. sql_param.push(v.to_string());
  1256. order_timestamp_first = false;
  1257. }
  1258. if let Some(v) = &param.order {
  1259. if v.eq_ignore_ascii_case("asc") {
  1260. is_cursor_prev = true;
  1261. } else if v.eq_ignore_ascii_case("desc") {
  1262. is_cursor_prev = false;
  1263. } else {
  1264. return Err("order param error".into());
  1265. }
  1266. }
  1267. if let Some(v) = &param.cursor {
  1268. if v.direction.eq_ignore_ascii_case("prev") {
  1269. is_desc_order = !is_desc_order;
  1270. } else if v.direction.eq_ignore_ascii_case("next") {
  1271. is_desc_order = is_desc_order;
  1272. } else {
  1273. return Err("cursor direction param error".into());
  1274. }
  1275. }
  1276. if let Some(v) = &param.client_ip {
  1277. sql_where.push("client_ip = ?".to_string());
  1278. sql_param.push(v.to_string());
  1279. }
  1280. if let Some(v) = &param.mac {
  1281. sql_where.push("mac = ?".to_string());
  1282. sql_param.push(v.to_string());
  1283. }
  1284. if let Some(v) = &param.hostname {
  1285. sql_where.push("hostname = ?".to_string());
  1286. sql_param.push(v.to_string());
  1287. }
  1288. if let Some(v) = &param.timestamp_before {
  1289. let mut use_cursor = false;
  1290. if param.cursor.is_some() && (is_desc_order || is_cursor_prev) {
  1291. let v = param.cursor.as_ref().unwrap().id;
  1292. if let Some(v) = v {
  1293. sql_where.push("id < ?".to_string());
  1294. sql_param.push(v.to_string());
  1295. use_cursor = true;
  1296. order_timestamp_first = false;
  1297. cusor_with_timestamp = true;
  1298. }
  1299. }
  1300. if use_cursor == false {
  1301. sql_where.push("last_query_timestamp <= ?".to_string());
  1302. sql_param.push(v.to_string());
  1303. }
  1304. }
  1305. if let Some(v) = &param.timestamp_after {
  1306. let mut use_cursor = false;
  1307. if param.cursor.is_some() && (!is_desc_order || is_cursor_prev) {
  1308. let v = param.cursor.as_ref().unwrap().id;
  1309. if let Some(v) = v {
  1310. sql_where.push("id > ?".to_string());
  1311. sql_param.push(v.to_string());
  1312. use_cursor = true;
  1313. order_timestamp_first = false;
  1314. cusor_with_timestamp = true;
  1315. }
  1316. }
  1317. if use_cursor == false {
  1318. sql_where.push("last_query_timestamp >= ?".to_string());
  1319. sql_param.push(v.to_string());
  1320. }
  1321. }
  1322. if !cusor_with_timestamp {
  1323. if let Some(v) = &param.cursor {
  1324. if is_cursor_prev {
  1325. if let Some(id) = &v.id {
  1326. if is_desc_order {
  1327. sql_where.push("id > ?".to_string());
  1328. } else {
  1329. sql_where.push("id < ?".to_string());
  1330. }
  1331. sql_param.push(id.to_string());
  1332. order_timestamp_first = false;
  1333. }
  1334. } else {
  1335. if let Some(id) = &v.id {
  1336. if is_desc_order {
  1337. sql_where.push("id < ?".to_string());
  1338. } else {
  1339. sql_where.push("id > ?".to_string());
  1340. }
  1341. sql_param.push(id.to_string());
  1342. order_timestamp_first = false;
  1343. }
  1344. }
  1345. }
  1346. }
  1347. if is_desc_order {
  1348. if order_timestamp_first {
  1349. sql_order.push_str(" ORDER BY last_query_timestamp DESC, id DESC");
  1350. } else {
  1351. sql_order.push_str(" ORDER BY id DESC, last_query_timestamp DESC");
  1352. }
  1353. } else {
  1354. if order_timestamp_first {
  1355. sql_order.push_str(" ORDER BY last_query_timestamp ASC, id ASC");
  1356. } else {
  1357. sql_order.push_str(" ORDER BY id ASC, last_query_timestamp ASC");
  1358. }
  1359. }
  1360. let sql_where = if sql_where.is_empty() {
  1361. String::new()
  1362. } else {
  1363. format!(" WHERE {}", sql_where.join(" AND "))
  1364. };
  1365. Ok((sql_where, sql_order, sql_param))
  1366. }
  1367. pub fn get_client_list(
  1368. &self,
  1369. param: Option<&ClientListGetParam>,
  1370. ) -> Result<QueryClientListResult, Box<dyn Error>> {
  1371. let query_start = std::time::Instant::now();
  1372. let mut cursor_reverse = false;
  1373. let mut ret = QueryClientListResult {
  1374. client_list: vec![],
  1375. total_count: 0,
  1376. step_by_cursor: false,
  1377. };
  1378. let conn = self.get_readonly_conn();
  1379. if conn.as_ref().is_none() {
  1380. return Err("db is not open".into());
  1381. }
  1382. let conn = conn.as_ref().unwrap();
  1383. let (sql_where, sql_order, mut sql_param) = Self::get_client_sql_where(param)?;
  1384. let mut sql = String::new();
  1385. sql.push_str("SELECT id, client_ip, mac, hostname, last_query_timestamp FROM client");
  1386. sql.push_str(sql_where.as_str());
  1387. sql.push_str(sql_order.as_str());
  1388. if let Some(p) = param {
  1389. let mut with_offset = true;
  1390. if let Some(cursor) = &p.cursor {
  1391. if cursor.id.is_some() {
  1392. sql.push_str(" LIMIT ?");
  1393. sql_param.push(p.page_size.to_string());
  1394. with_offset = false;
  1395. }
  1396. if cursor.direction.eq_ignore_ascii_case("prev") {
  1397. cursor_reverse = true;
  1398. }
  1399. }
  1400. if with_offset {
  1401. sql.push_str(" LIMIT ? OFFSET ?");
  1402. sql_param.push(p.page_size.to_string());
  1403. sql_param.push(((p.page_num - 1) * p.page_size).to_string());
  1404. }
  1405. }
  1406. self.debug_query_plan(conn, sql.clone(), &sql_param);
  1407. let stmt = conn.prepare(&sql);
  1408. if let Err(e) = stmt {
  1409. dns_log!(LogLevel::ERROR, "get_client_list error: {}", e);
  1410. return Err("get_client_list error".into());
  1411. }
  1412. let mut stmt = stmt?;
  1413. let rows = stmt.query_map(rusqlite::params_from_iter(sql_param), |row| {
  1414. Ok(ClientData {
  1415. id: row.get(0)?,
  1416. client_ip: row.get(1)?,
  1417. mac: row.get(2)?,
  1418. hostname: row.get(3)?,
  1419. last_query_timestamp: row.get(4)?,
  1420. })
  1421. });
  1422. if let Err(e) = rows {
  1423. return Err(Box::new(e));
  1424. }
  1425. if let Ok(rows) = rows {
  1426. for row in rows {
  1427. if let Ok(row) = row {
  1428. ret.client_list.push(row);
  1429. }
  1430. }
  1431. }
  1432. if cursor_reverse {
  1433. ret.client_list.reverse();
  1434. }
  1435. if let Some(p) = param {
  1436. if let Some(v) = &p.cursor {
  1437. ret.total_count = v.total_count;
  1438. ret.step_by_cursor = true;
  1439. } else {
  1440. let total_count = self.get_client_list_count(param);
  1441. ret.total_count = total_count;
  1442. }
  1443. }
  1444. dns_log!(
  1445. LogLevel::DEBUG,
  1446. "domain_list time: {}ms",
  1447. query_start.elapsed().as_millis()
  1448. );
  1449. Ok(ret)
  1450. }
  1451. pub fn delete_client_by_id(&self, id: u64) -> Result<u64, Box<dyn Error>> {
  1452. let conn = self.conn.lock().unwrap();
  1453. if conn.as_ref().is_none() {
  1454. return Err("db is not open".into());
  1455. }
  1456. let conn = conn.as_ref().unwrap();
  1457. let ret = conn.execute("DELETE FROM client WHERE id = ?", &[&id]);
  1458. if let Err(e) = ret {
  1459. return Err(Box::new(e));
  1460. }
  1461. Ok(ret.unwrap() as u64)
  1462. }
  1463. pub fn get_db_size(&self) -> u64 {
  1464. let db_file = self.get_db_file_path();
  1465. let mut total_size = 0;
  1466. if db_file.is_none() {
  1467. return 0;
  1468. }
  1469. let db_file = db_file.unwrap();
  1470. let wal_file = db_file.clone() + "-wal";
  1471. let metadata = fs::metadata(db_file);
  1472. if let Err(_) = metadata {
  1473. return 0;
  1474. }
  1475. total_size += metadata.unwrap().len();
  1476. let wal_metadata = fs::metadata(wal_file);
  1477. if let Ok(wal_metadata) = wal_metadata {
  1478. let wal_size = wal_metadata.len();
  1479. total_size += wal_size;
  1480. }
  1481. total_size
  1482. }
  1483. pub fn close(&self) {
  1484. let mut conn = self.conn.lock().unwrap();
  1485. if conn.as_ref().is_none() {
  1486. return;
  1487. }
  1488. if let Some(t) = conn.take() {
  1489. let _ = t.close();
  1490. }
  1491. }
  1492. }
  1493. impl Drop for DB {
  1494. fn drop(&mut self) {
  1495. self.close();
  1496. }
  1497. }