command_client.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. package libbox
  2. import (
  3. "context"
  4. "net"
  5. "os"
  6. "path/filepath"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/sagernet/sing-box/daemon"
  11. "github.com/sagernet/sing/common"
  12. E "github.com/sagernet/sing/common/exceptions"
  13. "google.golang.org/grpc"
  14. "google.golang.org/grpc/credentials/insecure"
  15. "google.golang.org/grpc/metadata"
  16. "google.golang.org/protobuf/types/known/emptypb"
  17. )
  18. type CommandClient struct {
  19. handler CommandClientHandler
  20. grpcConn *grpc.ClientConn
  21. grpcClient daemon.StartedServiceClient
  22. options CommandClientOptions
  23. ctx context.Context
  24. cancel context.CancelFunc
  25. clientMutex sync.RWMutex
  26. standalone bool
  27. }
  28. type CommandClientOptions struct {
  29. commands []int32
  30. StatusInterval int64
  31. }
  32. func (o *CommandClientOptions) AddCommand(command int32) {
  33. o.commands = append(o.commands, command)
  34. }
  35. type CommandClientHandler interface {
  36. Connected()
  37. Disconnected(message string)
  38. SetDefaultLogLevel(level int32)
  39. ClearLogs()
  40. WriteLogs(messageList LogIterator)
  41. WriteStatus(message *StatusMessage)
  42. WriteGroups(message OutboundGroupIterator)
  43. InitializeClashMode(modeList StringIterator, currentMode string)
  44. UpdateClashMode(newMode string)
  45. WriteConnectionEvents(events *ConnectionEvents)
  46. }
  47. type LogEntry struct {
  48. Level int32
  49. Message string
  50. }
  51. type LogIterator interface {
  52. Len() int32
  53. HasNext() bool
  54. Next() *LogEntry
  55. }
  56. type XPCDialer interface {
  57. DialXPC() (int32, error)
  58. }
  59. var sXPCDialer XPCDialer
  60. func SetXPCDialer(dialer XPCDialer) {
  61. sXPCDialer = dialer
  62. }
  63. func NewStandaloneCommandClient() *CommandClient {
  64. return &CommandClient{standalone: true}
  65. }
  66. func NewCommandClient(handler CommandClientHandler, options *CommandClientOptions) *CommandClient {
  67. return &CommandClient{
  68. handler: handler,
  69. options: common.PtrValueOrDefault(options),
  70. }
  71. }
  72. func unaryClientAuthInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  73. if sCommandServerSecret != "" {
  74. ctx = metadata.AppendToOutgoingContext(ctx, "x-command-secret", sCommandServerSecret)
  75. }
  76. return invoker(ctx, method, req, reply, cc, opts...)
  77. }
  78. func streamClientAuthInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  79. if sCommandServerSecret != "" {
  80. ctx = metadata.AppendToOutgoingContext(ctx, "x-command-secret", sCommandServerSecret)
  81. }
  82. return streamer(ctx, desc, cc, method, opts...)
  83. }
  84. const (
  85. commandClientDialAttempts = 10
  86. commandClientDialBaseDelay = 100 * time.Millisecond
  87. commandClientDialStepDelay = 50 * time.Millisecond
  88. )
  89. func commandClientDialDelay(attempt int) time.Duration {
  90. return commandClientDialBaseDelay + time.Duration(attempt)*commandClientDialStepDelay
  91. }
  92. func dialTarget() (string, func(context.Context, string) (net.Conn, error)) {
  93. if sXPCDialer != nil {
  94. return "passthrough:///xpc", func(ctx context.Context, _ string) (net.Conn, error) {
  95. fileDescriptor, err := sXPCDialer.DialXPC()
  96. if err != nil {
  97. return nil, err
  98. }
  99. return networkConnectionFromFileDescriptor(fileDescriptor)
  100. }
  101. }
  102. if sCommandServerListenPort == 0 {
  103. socketPath := filepath.Join(sBasePath, "command.sock")
  104. return "passthrough:///command-socket", func(ctx context.Context, _ string) (net.Conn, error) {
  105. var networkDialer net.Dialer
  106. return networkDialer.DialContext(ctx, "unix", socketPath)
  107. }
  108. }
  109. return net.JoinHostPort("127.0.0.1", strconv.Itoa(int(sCommandServerListenPort))), nil
  110. }
  111. func networkConnectionFromFileDescriptor(fileDescriptor int32) (net.Conn, error) {
  112. file := os.NewFile(uintptr(fileDescriptor), "xpc-command-socket")
  113. if file == nil {
  114. return nil, E.New("invalid file descriptor")
  115. }
  116. networkConnection, err := net.FileConn(file)
  117. if err != nil {
  118. file.Close()
  119. return nil, E.Cause(err, "create connection from fd")
  120. }
  121. file.Close()
  122. return networkConnection, nil
  123. }
  124. func (c *CommandClient) dialWithRetry(target string, contextDialer func(context.Context, string) (net.Conn, error), retryDial bool) (*grpc.ClientConn, daemon.StartedServiceClient, error) {
  125. var connection *grpc.ClientConn
  126. var client daemon.StartedServiceClient
  127. var lastError error
  128. for attempt := 0; attempt < commandClientDialAttempts; attempt++ {
  129. if connection == nil {
  130. options := []grpc.DialOption{
  131. grpc.WithTransportCredentials(insecure.NewCredentials()),
  132. grpc.WithUnaryInterceptor(unaryClientAuthInterceptor),
  133. grpc.WithStreamInterceptor(streamClientAuthInterceptor),
  134. }
  135. if contextDialer != nil {
  136. options = append(options, grpc.WithContextDialer(contextDialer))
  137. }
  138. var err error
  139. connection, err = grpc.NewClient(target, options...)
  140. if err != nil {
  141. lastError = err
  142. if !retryDial {
  143. return nil, nil, err
  144. }
  145. time.Sleep(commandClientDialDelay(attempt))
  146. continue
  147. }
  148. client = daemon.NewStartedServiceClient(connection)
  149. }
  150. waitDuration := commandClientDialDelay(attempt)
  151. ctx, cancel := context.WithTimeout(context.Background(), waitDuration)
  152. _, err := client.GetStartedAt(ctx, &emptypb.Empty{}, grpc.WaitForReady(true))
  153. cancel()
  154. if err == nil {
  155. return connection, client, nil
  156. }
  157. lastError = err
  158. }
  159. if connection != nil {
  160. connection.Close()
  161. }
  162. return nil, nil, lastError
  163. }
  164. func (c *CommandClient) Connect() error {
  165. c.clientMutex.Lock()
  166. common.Close(common.PtrOrNil(c.grpcConn))
  167. target, contextDialer := dialTarget()
  168. connection, client, err := c.dialWithRetry(target, contextDialer, true)
  169. if err != nil {
  170. c.clientMutex.Unlock()
  171. return err
  172. }
  173. c.grpcConn = connection
  174. c.grpcClient = client
  175. c.ctx, c.cancel = context.WithCancel(context.Background())
  176. c.clientMutex.Unlock()
  177. c.handler.Connected()
  178. return c.dispatchCommands()
  179. }
  180. func (c *CommandClient) ConnectWithFD(fd int32) error {
  181. c.clientMutex.Lock()
  182. common.Close(common.PtrOrNil(c.grpcConn))
  183. networkConnection, err := networkConnectionFromFileDescriptor(fd)
  184. if err != nil {
  185. c.clientMutex.Unlock()
  186. return err
  187. }
  188. connection, client, err := c.dialWithRetry("passthrough:///xpc", func(ctx context.Context, _ string) (net.Conn, error) {
  189. return networkConnection, nil
  190. }, false)
  191. if err != nil {
  192. networkConnection.Close()
  193. c.clientMutex.Unlock()
  194. return err
  195. }
  196. c.grpcConn = connection
  197. c.grpcClient = client
  198. c.ctx, c.cancel = context.WithCancel(context.Background())
  199. c.clientMutex.Unlock()
  200. c.handler.Connected()
  201. return c.dispatchCommands()
  202. }
  203. func (c *CommandClient) dispatchCommands() error {
  204. for _, command := range c.options.commands {
  205. switch command {
  206. case CommandLog:
  207. go c.handleLogStream()
  208. case CommandStatus:
  209. go c.handleStatusStream()
  210. case CommandGroup:
  211. go c.handleGroupStream()
  212. case CommandClashMode:
  213. go c.handleClashModeStream()
  214. case CommandConnections:
  215. go c.handleConnectionsStream()
  216. default:
  217. return E.New("unknown command: ", command)
  218. }
  219. }
  220. return nil
  221. }
  222. func (c *CommandClient) Disconnect() error {
  223. c.clientMutex.Lock()
  224. defer c.clientMutex.Unlock()
  225. if c.cancel != nil {
  226. c.cancel()
  227. }
  228. return common.Close(common.PtrOrNil(c.grpcConn))
  229. }
  230. func (c *CommandClient) getClientForCall() (daemon.StartedServiceClient, error) {
  231. c.clientMutex.RLock()
  232. if c.grpcClient != nil {
  233. defer c.clientMutex.RUnlock()
  234. return c.grpcClient, nil
  235. }
  236. c.clientMutex.RUnlock()
  237. c.clientMutex.Lock()
  238. defer c.clientMutex.Unlock()
  239. if c.grpcClient != nil {
  240. return c.grpcClient, nil
  241. }
  242. target, contextDialer := dialTarget()
  243. connection, client, err := c.dialWithRetry(target, contextDialer, true)
  244. if err != nil {
  245. return nil, err
  246. }
  247. c.grpcConn = connection
  248. c.grpcClient = client
  249. if c.ctx == nil {
  250. c.ctx, c.cancel = context.WithCancel(context.Background())
  251. }
  252. return c.grpcClient, nil
  253. }
  254. func (c *CommandClient) closeConnection() {
  255. c.clientMutex.Lock()
  256. defer c.clientMutex.Unlock()
  257. if c.grpcConn != nil {
  258. c.grpcConn.Close()
  259. c.grpcConn = nil
  260. c.grpcClient = nil
  261. }
  262. }
  263. func callWithResult[T any](c *CommandClient, call func(client daemon.StartedServiceClient) (T, error)) (T, error) {
  264. client, err := c.getClientForCall()
  265. if err != nil {
  266. var zero T
  267. return zero, err
  268. }
  269. if c.standalone {
  270. defer c.closeConnection()
  271. }
  272. return call(client)
  273. }
  274. func (c *CommandClient) getStreamContext() (daemon.StartedServiceClient, context.Context) {
  275. c.clientMutex.RLock()
  276. defer c.clientMutex.RUnlock()
  277. return c.grpcClient, c.ctx
  278. }
  279. func (c *CommandClient) handleLogStream() {
  280. client, ctx := c.getStreamContext()
  281. stream, err := client.SubscribeLog(ctx, &emptypb.Empty{})
  282. if err != nil {
  283. c.handler.Disconnected(err.Error())
  284. return
  285. }
  286. defaultLogLevel, err := client.GetDefaultLogLevel(ctx, &emptypb.Empty{})
  287. if err != nil {
  288. c.handler.Disconnected(err.Error())
  289. return
  290. }
  291. c.handler.SetDefaultLogLevel(int32(defaultLogLevel.Level))
  292. for {
  293. logMessage, err := stream.Recv()
  294. if err != nil {
  295. c.handler.Disconnected(err.Error())
  296. return
  297. }
  298. if logMessage.Reset_ {
  299. c.handler.ClearLogs()
  300. }
  301. var messages []*LogEntry
  302. for _, msg := range logMessage.Messages {
  303. messages = append(messages, &LogEntry{
  304. Level: int32(msg.Level),
  305. Message: msg.Message,
  306. })
  307. }
  308. c.handler.WriteLogs(newIterator(messages))
  309. }
  310. }
  311. func (c *CommandClient) handleStatusStream() {
  312. client, ctx := c.getStreamContext()
  313. interval := c.options.StatusInterval
  314. stream, err := client.SubscribeStatus(ctx, &daemon.SubscribeStatusRequest{
  315. Interval: interval,
  316. })
  317. if err != nil {
  318. c.handler.Disconnected(err.Error())
  319. return
  320. }
  321. for {
  322. status, err := stream.Recv()
  323. if err != nil {
  324. c.handler.Disconnected(err.Error())
  325. return
  326. }
  327. c.handler.WriteStatus(statusMessageFromGRPC(status))
  328. }
  329. }
  330. func (c *CommandClient) handleGroupStream() {
  331. client, ctx := c.getStreamContext()
  332. stream, err := client.SubscribeGroups(ctx, &emptypb.Empty{})
  333. if err != nil {
  334. c.handler.Disconnected(err.Error())
  335. return
  336. }
  337. for {
  338. groups, err := stream.Recv()
  339. if err != nil {
  340. c.handler.Disconnected(err.Error())
  341. return
  342. }
  343. c.handler.WriteGroups(outboundGroupIteratorFromGRPC(groups))
  344. }
  345. }
  346. func (c *CommandClient) handleClashModeStream() {
  347. client, ctx := c.getStreamContext()
  348. modeStatus, err := client.GetClashModeStatus(ctx, &emptypb.Empty{})
  349. if err != nil {
  350. c.handler.Disconnected(err.Error())
  351. return
  352. }
  353. if sFixAndroidStack {
  354. go func() {
  355. c.handler.InitializeClashMode(newIterator(modeStatus.ModeList), modeStatus.CurrentMode)
  356. if len(modeStatus.ModeList) == 0 {
  357. c.handler.Disconnected(os.ErrInvalid.Error())
  358. }
  359. }()
  360. } else {
  361. c.handler.InitializeClashMode(newIterator(modeStatus.ModeList), modeStatus.CurrentMode)
  362. if len(modeStatus.ModeList) == 0 {
  363. c.handler.Disconnected(os.ErrInvalid.Error())
  364. return
  365. }
  366. }
  367. if len(modeStatus.ModeList) == 0 {
  368. return
  369. }
  370. stream, err := client.SubscribeClashMode(ctx, &emptypb.Empty{})
  371. if err != nil {
  372. c.handler.Disconnected(err.Error())
  373. return
  374. }
  375. for {
  376. mode, err := stream.Recv()
  377. if err != nil {
  378. c.handler.Disconnected(err.Error())
  379. return
  380. }
  381. c.handler.UpdateClashMode(mode.Mode)
  382. }
  383. }
  384. func (c *CommandClient) handleConnectionsStream() {
  385. client, ctx := c.getStreamContext()
  386. interval := c.options.StatusInterval
  387. stream, err := client.SubscribeConnections(ctx, &daemon.SubscribeConnectionsRequest{
  388. Interval: interval,
  389. })
  390. if err != nil {
  391. c.handler.Disconnected(err.Error())
  392. return
  393. }
  394. for {
  395. events, err := stream.Recv()
  396. if err != nil {
  397. c.handler.Disconnected(err.Error())
  398. return
  399. }
  400. libboxEvents := connectionEventsFromGRPC(events)
  401. c.handler.WriteConnectionEvents(libboxEvents)
  402. }
  403. }
  404. func (c *CommandClient) SelectOutbound(groupTag string, outboundTag string) error {
  405. _, err := callWithResult(c, func(client daemon.StartedServiceClient) (*emptypb.Empty, error) {
  406. return client.SelectOutbound(context.Background(), &daemon.SelectOutboundRequest{
  407. GroupTag: groupTag,
  408. OutboundTag: outboundTag,
  409. })
  410. })
  411. return err
  412. }
  413. func (c *CommandClient) URLTest(groupTag string) error {
  414. _, err := callWithResult(c, func(client daemon.StartedServiceClient) (*emptypb.Empty, error) {
  415. return client.URLTest(context.Background(), &daemon.URLTestRequest{
  416. OutboundTag: groupTag,
  417. })
  418. })
  419. return err
  420. }
  421. func (c *CommandClient) SetClashMode(newMode string) error {
  422. _, err := callWithResult(c, func(client daemon.StartedServiceClient) (*emptypb.Empty, error) {
  423. return client.SetClashMode(context.Background(), &daemon.ClashMode{
  424. Mode: newMode,
  425. })
  426. })
  427. return err
  428. }
  429. func (c *CommandClient) CloseConnection(connId string) error {
  430. _, err := callWithResult(c, func(client daemon.StartedServiceClient) (*emptypb.Empty, error) {
  431. return client.CloseConnection(context.Background(), &daemon.CloseConnectionRequest{
  432. Id: connId,
  433. })
  434. })
  435. return err
  436. }
  437. func (c *CommandClient) CloseConnections() error {
  438. _, err := callWithResult(c, func(client daemon.StartedServiceClient) (*emptypb.Empty, error) {
  439. return client.CloseAllConnections(context.Background(), &emptypb.Empty{})
  440. })
  441. return err
  442. }
  443. func (c *CommandClient) ServiceReload() error {
  444. _, err := callWithResult(c, func(client daemon.StartedServiceClient) (*emptypb.Empty, error) {
  445. return client.ReloadService(context.Background(), &emptypb.Empty{})
  446. })
  447. return err
  448. }
  449. func (c *CommandClient) ServiceClose() error {
  450. _, err := callWithResult(c, func(client daemon.StartedServiceClient) (*emptypb.Empty, error) {
  451. return client.StopService(context.Background(), &emptypb.Empty{})
  452. })
  453. return err
  454. }
  455. func (c *CommandClient) ClearLogs() error {
  456. _, err := callWithResult(c, func(client daemon.StartedServiceClient) (*emptypb.Empty, error) {
  457. return client.ClearLogs(context.Background(), &emptypb.Empty{})
  458. })
  459. return err
  460. }
  461. func (c *CommandClient) GetSystemProxyStatus() (*SystemProxyStatus, error) {
  462. return callWithResult(c, func(client daemon.StartedServiceClient) (*SystemProxyStatus, error) {
  463. status, err := client.GetSystemProxyStatus(context.Background(), &emptypb.Empty{})
  464. if err != nil {
  465. return nil, err
  466. }
  467. return systemProxyStatusFromGRPC(status), nil
  468. })
  469. }
  470. func (c *CommandClient) SetSystemProxyEnabled(isEnabled bool) error {
  471. _, err := callWithResult(c, func(client daemon.StartedServiceClient) (*emptypb.Empty, error) {
  472. return client.SetSystemProxyEnabled(context.Background(), &daemon.SetSystemProxyEnabledRequest{
  473. Enabled: isEnabled,
  474. })
  475. })
  476. return err
  477. }
  478. func (c *CommandClient) GetDeprecatedNotes() (DeprecatedNoteIterator, error) {
  479. return callWithResult(c, func(client daemon.StartedServiceClient) (DeprecatedNoteIterator, error) {
  480. warnings, err := client.GetDeprecatedWarnings(context.Background(), &emptypb.Empty{})
  481. if err != nil {
  482. return nil, err
  483. }
  484. var notes []*DeprecatedNote
  485. for _, warning := range warnings.Warnings {
  486. notes = append(notes, &DeprecatedNote{
  487. Description: warning.Message,
  488. MigrationLink: warning.MigrationLink,
  489. })
  490. }
  491. return newIterator(notes), nil
  492. })
  493. }
  494. func (c *CommandClient) GetStartedAt() (int64, error) {
  495. return callWithResult(c, func(client daemon.StartedServiceClient) (int64, error) {
  496. startedAt, err := client.GetStartedAt(context.Background(), &emptypb.Empty{})
  497. if err != nil {
  498. return 0, err
  499. }
  500. return startedAt.StartedAt, nil
  501. })
  502. }
  503. func (c *CommandClient) SetGroupExpand(groupTag string, isExpand bool) error {
  504. _, err := callWithResult(c, func(client daemon.StartedServiceClient) (*emptypb.Empty, error) {
  505. return client.SetGroupExpand(context.Background(), &daemon.SetGroupExpandRequest{
  506. GroupTag: groupTag,
  507. IsExpand: isExpand,
  508. })
  509. })
  510. return err
  511. }