Pārlūkot izejas kodu

desktop: replicate tauri-plugin-shell logic (#13986)

Brendan Allan 2 mēneši atpakaļ
vecāks
revīzija
4025b655a4
1 mainītis faili ar 74 papildinājumiem un 44 dzēšanām
  1. 74 44
      packages/desktop/src-tauri/src/cli.rs

+ 74 - 44
packages/desktop/src-tauri/src/cli.rs

@@ -6,13 +6,17 @@ use process_wrap::tokio::ProcessGroup;
 use process_wrap::tokio::{JobObject, KillOnDrop};
 #[cfg(unix)]
 use std::os::unix::process::ExitStatusExt;
+use std::sync::Arc;
 use std::{process::Stdio, time::Duration};
 use tauri::{AppHandle, Manager, path::BaseDirectory};
 use tauri_plugin_store::StoreExt;
 use tauri_specta::Event;
-use tokio::io::{AsyncBufReadExt, BufReader};
-use tokio::process::Command;
-use tokio::sync::{mpsc, oneshot};
+use tokio::{
+    io::{AsyncBufRead, AsyncBufReadExt, BufReader},
+    process::Command,
+    sync::{mpsc, oneshot},
+    task::JoinHandle,
+};
 use tokio_stream::wrappers::ReceiverStream;
 use tracing::Instrument;
 
@@ -34,8 +38,8 @@ pub struct Config {
 
 #[derive(Clone, Debug)]
 pub enum CommandEvent {
-    Stdout(Vec<u8>),
-    Stderr(Vec<u8>),
+    Stdout(String),
+    Stderr(String),
     Error(String),
     Terminated(TerminatedPayload),
 }
@@ -64,10 +68,11 @@ pub async fn get_config(app: &AppHandle) -> Option<Config> {
 
     events
         .fold(String::new(), async |mut config_str, event| {
-            if let CommandEvent::Stdout(stdout) = event
-                && let Ok(s) = str::from_utf8(&stdout)
-            {
-                config_str += s
+            if let CommandEvent::Stdout(s) = &event {
+                config_str += s.as_str()
+            }
+            if let CommandEvent::Stderr(s) = &event {
+                config_str += s.as_str()
             }
 
             config_str
@@ -317,9 +322,9 @@ pub fn spawn_command(
         cmd
     };
 
-    cmd.stdin(Stdio::null());
     cmd.stdout(Stdio::piped());
     cmd.stderr(Stdio::piped());
+    cmd.stdin(Stdio::null());
 
     #[cfg(windows)]
     cmd.creation_flags(0x0800_0000);
@@ -337,32 +342,24 @@ pub fn spawn_command(
     }
 
     let mut child = wrap.spawn()?;
-    let stdout = child.stdout().take();
-    let stderr = child.stderr().take();
+    let guard = Arc::new(tokio::sync::RwLock::new(()));
     let (tx, rx) = mpsc::channel(256);
     let (kill_tx, mut kill_rx) = mpsc::channel(1);
 
-    if let Some(stdout) = stdout {
-        let tx = tx.clone();
-        tokio::spawn(async move {
-            let mut lines = BufReader::new(stdout).lines();
-            while let Ok(Some(line)) = lines.next_line().await {
-                let _ = tx.send(CommandEvent::Stdout(line.into_bytes())).await;
-            }
-        });
-    }
-
-    if let Some(stderr) = stderr {
-        let tx = tx.clone();
-        tokio::spawn(async move {
-            let mut lines = BufReader::new(stderr).lines();
-            while let Ok(Some(line)) = lines.next_line().await {
-                let _ = tx.send(CommandEvent::Stderr(line.into_bytes())).await;
-            }
-        });
-    }
+    let stdout = spawn_pipe_reader(
+        tx.clone(),
+        guard.clone(),
+        BufReader::new(child.stdout().take().unwrap()),
+        CommandEvent::Stdout,
+    );
+    let stderr = spawn_pipe_reader(
+        tx.clone(),
+        guard.clone(),
+        BufReader::new(child.stderr().take().unwrap()),
+        CommandEvent::Stderr,
+    );
 
-    tokio::spawn(async move {
+    tokio::task::spawn(async move {
         let mut kill_open = true;
         let status = loop {
             match child.try_wait() {
@@ -394,6 +391,9 @@ pub fn spawn_command(
                 let _ = tx.send(CommandEvent::Error(err.to_string())).await;
             }
         }
+
+        stdout.abort();
+        stderr.abort();
     });
 
     let event_stream = ReceiverStream::new(rx);
@@ -404,9 +404,7 @@ pub fn spawn_command(
 
 fn signal_from_status(status: std::process::ExitStatus) -> Option<i32> {
     #[cfg(unix)]
-    {
-        return status.signal();
-    }
+    return status.signal();
 
     #[cfg(not(unix))]
     {
@@ -442,12 +440,10 @@ pub fn serve(
         events
             .for_each(move |event| {
                 match event {
-                    CommandEvent::Stdout(line_bytes) => {
-                        let line = String::from_utf8_lossy(&line_bytes);
+                    CommandEvent::Stdout(line) => {
                         tracing::info!("{line}");
                     }
-                    CommandEvent::Stderr(line_bytes) => {
-                        let line = String::from_utf8_lossy(&line_bytes);
+                    CommandEvent::Stderr(line) => {
                         tracing::info!("{line}");
                     }
                     CommandEvent::Error(err) => {
@@ -499,11 +495,7 @@ pub mod sqlite_migration {
             }
 
             future::ready(match &event {
-                CommandEvent::Stdout(stdout) => {
-                    let Ok(s) = str::from_utf8(stdout) else {
-                        return future::ready(None);
-                    };
-
+                CommandEvent::Stdout(s) | CommandEvent::Stderr(s) => {
                     if let Some(s) = s.strip_prefix("sqlite-migration:").map(|s| s.trim()) {
                         if let Ok(progress) = s.parse::<u8>() {
                             let _ = SqliteMigrationProgress::InProgress(progress).emit(&app);
@@ -522,3 +514,41 @@ pub mod sqlite_migration {
         })
     }
 }
+
+fn spawn_pipe_reader<F: Fn(String) -> CommandEvent + Send + Copy + 'static>(
+    tx: mpsc::Sender<CommandEvent>,
+    guard: Arc<tokio::sync::RwLock<()>>,
+    pipe_reader: impl AsyncBufRead + Send + Unpin + 'static,
+    wrapper: F,
+) -> JoinHandle<()> {
+    tokio::spawn(async move {
+        let _lock = guard.read().await;
+        let reader = BufReader::new(pipe_reader);
+
+        read_line(reader, tx, wrapper).await;
+    })
+}
+
+async fn read_line<F: Fn(String) -> CommandEvent + Send + Copy + 'static>(
+    reader: BufReader<impl AsyncBufRead + Unpin>,
+    tx: mpsc::Sender<CommandEvent>,
+    wrapper: F,
+) {
+    let mut lines = reader.lines();
+    loop {
+        let line = lines.next_line().await;
+
+        match line {
+            Ok(s) => {
+                if let Some(s) = s {
+                    let _ = tx.clone().send(wrapper(s)).await;
+                }
+            }
+            Err(e) => {
+                let tx_ = tx.clone();
+                let _ = tx_.send(CommandEvent::Error(e.to_string())).await;
+                break;
+            }
+        }
+    }
+}