command_client.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. package libbox
  2. import (
  3. "context"
  4. "net"
  5. "os"
  6. "path/filepath"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/sagernet/sing-box/daemon"
  11. "github.com/sagernet/sing/common"
  12. E "github.com/sagernet/sing/common/exceptions"
  13. "google.golang.org/grpc"
  14. "google.golang.org/grpc/credentials/insecure"
  15. "google.golang.org/grpc/metadata"
  16. "google.golang.org/protobuf/types/known/emptypb"
  17. )
  18. type CommandClient struct {
  19. handler CommandClientHandler
  20. grpcConn *grpc.ClientConn
  21. grpcClient daemon.StartedServiceClient
  22. options CommandClientOptions
  23. ctx context.Context
  24. cancel context.CancelFunc
  25. clientMutex sync.RWMutex
  26. }
  27. type CommandClientOptions struct {
  28. commands []int32
  29. StatusInterval int64
  30. }
  31. func (o *CommandClientOptions) AddCommand(command int32) {
  32. o.commands = append(o.commands, command)
  33. }
  34. type CommandClientHandler interface {
  35. Connected()
  36. Disconnected(message string)
  37. SetDefaultLogLevel(level int32)
  38. ClearLogs()
  39. WriteLogs(messageList LogIterator)
  40. WriteStatus(message *StatusMessage)
  41. WriteGroups(message OutboundGroupIterator)
  42. InitializeClashMode(modeList StringIterator, currentMode string)
  43. UpdateClashMode(newMode string)
  44. WriteConnections(message *Connections)
  45. }
  46. type LogEntry struct {
  47. Level int32
  48. Message string
  49. }
  50. type LogIterator interface {
  51. Len() int32
  52. HasNext() bool
  53. Next() *LogEntry
  54. }
  55. type XPCDialer interface {
  56. DialXPC() (int32, error)
  57. }
  58. var sXPCDialer XPCDialer
  59. func SetXPCDialer(dialer XPCDialer) {
  60. sXPCDialer = dialer
  61. }
  62. func NewStandaloneCommandClient() *CommandClient {
  63. return new(CommandClient)
  64. }
  65. func NewCommandClient(handler CommandClientHandler, options *CommandClientOptions) *CommandClient {
  66. return &CommandClient{
  67. handler: handler,
  68. options: common.PtrValueOrDefault(options),
  69. }
  70. }
  71. func unaryClientAuthInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  72. if sCommandServerSecret != "" {
  73. ctx = metadata.AppendToOutgoingContext(ctx, "x-command-secret", sCommandServerSecret)
  74. }
  75. return invoker(ctx, method, req, reply, cc, opts...)
  76. }
  77. func streamClientAuthInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  78. if sCommandServerSecret != "" {
  79. ctx = metadata.AppendToOutgoingContext(ctx, "x-command-secret", sCommandServerSecret)
  80. }
  81. return streamer(ctx, desc, cc, method, opts...)
  82. }
  83. func (c *CommandClient) grpcDial() (*grpc.ClientConn, error) {
  84. var target string
  85. if sCommandServerListenPort == 0 {
  86. target = "unix://" + filepath.Join(sBasePath, "command.sock")
  87. } else {
  88. target = net.JoinHostPort("127.0.0.1", strconv.Itoa(int(sCommandServerListenPort)))
  89. }
  90. var (
  91. conn *grpc.ClientConn
  92. err error
  93. )
  94. clientOptions := []grpc.DialOption{
  95. grpc.WithTransportCredentials(insecure.NewCredentials()),
  96. grpc.WithUnaryInterceptor(unaryClientAuthInterceptor),
  97. grpc.WithStreamInterceptor(streamClientAuthInterceptor),
  98. }
  99. for i := 0; i < 10; i++ {
  100. conn, err = grpc.NewClient(target, clientOptions...)
  101. if err == nil {
  102. return conn, nil
  103. }
  104. time.Sleep(time.Duration(100+i*50) * time.Millisecond)
  105. }
  106. return nil, err
  107. }
  108. func (c *CommandClient) Connect() error {
  109. c.clientMutex.Lock()
  110. common.Close(common.PtrOrNil(c.grpcConn))
  111. if sXPCDialer != nil {
  112. fd, err := sXPCDialer.DialXPC()
  113. if err != nil {
  114. c.clientMutex.Unlock()
  115. return err
  116. }
  117. file := os.NewFile(uintptr(fd), "xpc-command-socket")
  118. if file == nil {
  119. c.clientMutex.Unlock()
  120. return E.New("invalid file descriptor")
  121. }
  122. netConn, err := net.FileConn(file)
  123. if err != nil {
  124. file.Close()
  125. c.clientMutex.Unlock()
  126. return E.Cause(err, "create connection from fd")
  127. }
  128. file.Close()
  129. clientOptions := []grpc.DialOption{
  130. grpc.WithTransportCredentials(insecure.NewCredentials()),
  131. grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
  132. return netConn, nil
  133. }),
  134. grpc.WithUnaryInterceptor(unaryClientAuthInterceptor),
  135. grpc.WithStreamInterceptor(streamClientAuthInterceptor),
  136. }
  137. grpcConn, err := grpc.NewClient("passthrough:///xpc", clientOptions...)
  138. if err != nil {
  139. netConn.Close()
  140. c.clientMutex.Unlock()
  141. return err
  142. }
  143. c.grpcConn = grpcConn
  144. c.grpcClient = daemon.NewStartedServiceClient(grpcConn)
  145. c.ctx, c.cancel = context.WithCancel(context.Background())
  146. c.clientMutex.Unlock()
  147. } else {
  148. conn, err := c.grpcDial()
  149. if err != nil {
  150. c.clientMutex.Unlock()
  151. return err
  152. }
  153. c.grpcConn = conn
  154. c.grpcClient = daemon.NewStartedServiceClient(conn)
  155. c.ctx, c.cancel = context.WithCancel(context.Background())
  156. c.clientMutex.Unlock()
  157. }
  158. c.handler.Connected()
  159. for _, command := range c.options.commands {
  160. switch command {
  161. case CommandLog:
  162. go c.handleLogStream()
  163. case CommandStatus:
  164. go c.handleStatusStream()
  165. case CommandGroup:
  166. go c.handleGroupStream()
  167. case CommandClashMode:
  168. go c.handleClashModeStream()
  169. case CommandConnections:
  170. go c.handleConnectionsStream()
  171. default:
  172. return E.New("unknown command: ", command)
  173. }
  174. }
  175. return nil
  176. }
  177. func (c *CommandClient) ConnectWithFD(fd int32) error {
  178. c.clientMutex.Lock()
  179. common.Close(common.PtrOrNil(c.grpcConn))
  180. file := os.NewFile(uintptr(fd), "xpc-command-socket")
  181. if file == nil {
  182. c.clientMutex.Unlock()
  183. return E.New("invalid file descriptor")
  184. }
  185. netConn, err := net.FileConn(file)
  186. if err != nil {
  187. file.Close()
  188. c.clientMutex.Unlock()
  189. return E.Cause(err, "create connection from fd")
  190. }
  191. file.Close()
  192. clientOptions := []grpc.DialOption{
  193. grpc.WithTransportCredentials(insecure.NewCredentials()),
  194. grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
  195. return netConn, nil
  196. }),
  197. grpc.WithUnaryInterceptor(unaryClientAuthInterceptor),
  198. grpc.WithStreamInterceptor(streamClientAuthInterceptor),
  199. }
  200. grpcConn, err := grpc.NewClient("passthrough:///xpc", clientOptions...)
  201. if err != nil {
  202. netConn.Close()
  203. c.clientMutex.Unlock()
  204. return err
  205. }
  206. c.grpcConn = grpcConn
  207. c.grpcClient = daemon.NewStartedServiceClient(grpcConn)
  208. c.ctx, c.cancel = context.WithCancel(context.Background())
  209. c.clientMutex.Unlock()
  210. c.handler.Connected()
  211. for _, command := range c.options.commands {
  212. switch command {
  213. case CommandLog:
  214. go c.handleLogStream()
  215. case CommandStatus:
  216. go c.handleStatusStream()
  217. case CommandGroup:
  218. go c.handleGroupStream()
  219. case CommandClashMode:
  220. go c.handleClashModeStream()
  221. case CommandConnections:
  222. go c.handleConnectionsStream()
  223. default:
  224. return E.New("unknown command: ", command)
  225. }
  226. }
  227. return nil
  228. }
  229. func (c *CommandClient) Disconnect() error {
  230. c.clientMutex.Lock()
  231. defer c.clientMutex.Unlock()
  232. if c.cancel != nil {
  233. c.cancel()
  234. }
  235. return common.Close(common.PtrOrNil(c.grpcConn))
  236. }
  237. func (c *CommandClient) getClientForCall() (daemon.StartedServiceClient, error) {
  238. c.clientMutex.RLock()
  239. if c.grpcClient != nil {
  240. defer c.clientMutex.RUnlock()
  241. return c.grpcClient, nil
  242. }
  243. c.clientMutex.RUnlock()
  244. c.clientMutex.Lock()
  245. defer c.clientMutex.Unlock()
  246. if c.grpcClient != nil {
  247. return c.grpcClient, nil
  248. }
  249. if sXPCDialer != nil {
  250. fd, err := sXPCDialer.DialXPC()
  251. if err != nil {
  252. return nil, err
  253. }
  254. file := os.NewFile(uintptr(fd), "xpc-command-socket")
  255. if file == nil {
  256. return nil, E.New("invalid file descriptor")
  257. }
  258. netConn, err := net.FileConn(file)
  259. if err != nil {
  260. file.Close()
  261. return nil, E.Cause(err, "create connection from fd")
  262. }
  263. file.Close()
  264. clientOptions := []grpc.DialOption{
  265. grpc.WithTransportCredentials(insecure.NewCredentials()),
  266. grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
  267. return netConn, nil
  268. }),
  269. grpc.WithUnaryInterceptor(unaryClientAuthInterceptor),
  270. grpc.WithStreamInterceptor(streamClientAuthInterceptor),
  271. }
  272. grpcConn, err := grpc.NewClient("passthrough:///xpc", clientOptions...)
  273. if err != nil {
  274. netConn.Close()
  275. return nil, err
  276. }
  277. c.grpcConn = grpcConn
  278. c.grpcClient = daemon.NewStartedServiceClient(grpcConn)
  279. if c.ctx == nil {
  280. c.ctx, c.cancel = context.WithCancel(context.Background())
  281. }
  282. return c.grpcClient, nil
  283. }
  284. conn, err := c.grpcDial()
  285. if err != nil {
  286. return nil, err
  287. }
  288. c.grpcConn = conn
  289. c.grpcClient = daemon.NewStartedServiceClient(conn)
  290. if c.ctx == nil {
  291. c.ctx, c.cancel = context.WithCancel(context.Background())
  292. }
  293. return c.grpcClient, nil
  294. }
  295. func (c *CommandClient) getStreamContext() (daemon.StartedServiceClient, context.Context) {
  296. c.clientMutex.RLock()
  297. defer c.clientMutex.RUnlock()
  298. return c.grpcClient, c.ctx
  299. }
  300. func (c *CommandClient) handleLogStream() {
  301. client, ctx := c.getStreamContext()
  302. stream, err := client.SubscribeLog(ctx, &emptypb.Empty{})
  303. if err != nil {
  304. c.handler.Disconnected(err.Error())
  305. return
  306. }
  307. defaultLogLevel, err := client.GetDefaultLogLevel(ctx, &emptypb.Empty{})
  308. if err != nil {
  309. c.handler.Disconnected(err.Error())
  310. return
  311. }
  312. c.handler.SetDefaultLogLevel(int32(defaultLogLevel.Level))
  313. for {
  314. logMessage, err := stream.Recv()
  315. if err != nil {
  316. c.handler.Disconnected(err.Error())
  317. return
  318. }
  319. if logMessage.Reset_ {
  320. c.handler.ClearLogs()
  321. }
  322. var messages []*LogEntry
  323. for _, msg := range logMessage.Messages {
  324. messages = append(messages, &LogEntry{
  325. Level: int32(msg.Level),
  326. Message: msg.Message,
  327. })
  328. }
  329. c.handler.WriteLogs(newIterator(messages))
  330. }
  331. }
  332. func (c *CommandClient) handleStatusStream() {
  333. client, ctx := c.getStreamContext()
  334. interval := c.options.StatusInterval
  335. stream, err := client.SubscribeStatus(ctx, &daemon.SubscribeStatusRequest{
  336. Interval: interval,
  337. })
  338. if err != nil {
  339. c.handler.Disconnected(err.Error())
  340. return
  341. }
  342. for {
  343. status, err := stream.Recv()
  344. if err != nil {
  345. c.handler.Disconnected(err.Error())
  346. return
  347. }
  348. c.handler.WriteStatus(StatusMessageFromGRPC(status))
  349. }
  350. }
  351. func (c *CommandClient) handleGroupStream() {
  352. client, ctx := c.getStreamContext()
  353. stream, err := client.SubscribeGroups(ctx, &emptypb.Empty{})
  354. if err != nil {
  355. c.handler.Disconnected(err.Error())
  356. return
  357. }
  358. for {
  359. groups, err := stream.Recv()
  360. if err != nil {
  361. c.handler.Disconnected(err.Error())
  362. return
  363. }
  364. c.handler.WriteGroups(OutboundGroupIteratorFromGRPC(groups))
  365. }
  366. }
  367. func (c *CommandClient) handleClashModeStream() {
  368. client, ctx := c.getStreamContext()
  369. modeStatus, err := client.GetClashModeStatus(ctx, &emptypb.Empty{})
  370. if err != nil {
  371. c.handler.Disconnected(err.Error())
  372. return
  373. }
  374. if sFixAndroidStack {
  375. go func() {
  376. c.handler.InitializeClashMode(newIterator(modeStatus.ModeList), modeStatus.CurrentMode)
  377. if len(modeStatus.ModeList) == 0 {
  378. c.handler.Disconnected(os.ErrInvalid.Error())
  379. }
  380. }()
  381. } else {
  382. c.handler.InitializeClashMode(newIterator(modeStatus.ModeList), modeStatus.CurrentMode)
  383. if len(modeStatus.ModeList) == 0 {
  384. c.handler.Disconnected(os.ErrInvalid.Error())
  385. return
  386. }
  387. }
  388. if len(modeStatus.ModeList) == 0 {
  389. return
  390. }
  391. stream, err := client.SubscribeClashMode(ctx, &emptypb.Empty{})
  392. if err != nil {
  393. c.handler.Disconnected(err.Error())
  394. return
  395. }
  396. for {
  397. mode, err := stream.Recv()
  398. if err != nil {
  399. c.handler.Disconnected(err.Error())
  400. return
  401. }
  402. c.handler.UpdateClashMode(mode.Mode)
  403. }
  404. }
  405. func (c *CommandClient) handleConnectionsStream() {
  406. client, ctx := c.getStreamContext()
  407. interval := c.options.StatusInterval
  408. stream, err := client.SubscribeConnections(ctx, &daemon.SubscribeConnectionsRequest{
  409. Interval: interval,
  410. })
  411. if err != nil {
  412. c.handler.Disconnected(err.Error())
  413. return
  414. }
  415. var connections Connections
  416. for {
  417. conns, err := stream.Recv()
  418. if err != nil {
  419. c.handler.Disconnected(err.Error())
  420. return
  421. }
  422. connections.input = ConnectionsFromGRPC(conns)
  423. c.handler.WriteConnections(&connections)
  424. }
  425. }
  426. func (c *CommandClient) SelectOutbound(groupTag string, outboundTag string) error {
  427. client, err := c.getClientForCall()
  428. if err != nil {
  429. return err
  430. }
  431. _, err = client.SelectOutbound(context.Background(), &daemon.SelectOutboundRequest{
  432. GroupTag: groupTag,
  433. OutboundTag: outboundTag,
  434. })
  435. return err
  436. }
  437. func (c *CommandClient) URLTest(groupTag string) error {
  438. client, err := c.getClientForCall()
  439. if err != nil {
  440. return err
  441. }
  442. _, err = client.URLTest(context.Background(), &daemon.URLTestRequest{
  443. OutboundTag: groupTag,
  444. })
  445. return err
  446. }
  447. func (c *CommandClient) SetClashMode(newMode string) error {
  448. client, err := c.getClientForCall()
  449. if err != nil {
  450. return err
  451. }
  452. _, err = client.SetClashMode(context.Background(), &daemon.ClashMode{
  453. Mode: newMode,
  454. })
  455. return err
  456. }
  457. func (c *CommandClient) CloseConnection(connId string) error {
  458. client, err := c.getClientForCall()
  459. if err != nil {
  460. return err
  461. }
  462. _, err = client.CloseConnection(context.Background(), &daemon.CloseConnectionRequest{
  463. Id: connId,
  464. })
  465. return err
  466. }
  467. func (c *CommandClient) CloseConnections() error {
  468. client, err := c.getClientForCall()
  469. if err != nil {
  470. return err
  471. }
  472. _, err = client.CloseAllConnections(context.Background(), &emptypb.Empty{})
  473. return err
  474. }
  475. func (c *CommandClient) ServiceReload() error {
  476. client, err := c.getClientForCall()
  477. if err != nil {
  478. return err
  479. }
  480. _, err = client.ReloadService(context.Background(), &emptypb.Empty{})
  481. return err
  482. }
  483. func (c *CommandClient) ServiceClose() error {
  484. client, err := c.getClientForCall()
  485. if err != nil {
  486. return err
  487. }
  488. _, err = client.StopService(context.Background(), &emptypb.Empty{})
  489. return err
  490. }
  491. func (c *CommandClient) ClearLogs() error {
  492. client, err := c.getClientForCall()
  493. if err != nil {
  494. return err
  495. }
  496. _, err = client.ClearLogs(context.Background(), &emptypb.Empty{})
  497. return err
  498. }
  499. func (c *CommandClient) GetSystemProxyStatus() (*SystemProxyStatus, error) {
  500. client, err := c.getClientForCall()
  501. if err != nil {
  502. return nil, err
  503. }
  504. status, err := client.GetSystemProxyStatus(context.Background(), &emptypb.Empty{})
  505. if err != nil {
  506. return nil, err
  507. }
  508. return SystemProxyStatusFromGRPC(status), nil
  509. }
  510. func (c *CommandClient) SetSystemProxyEnabled(isEnabled bool) error {
  511. client, err := c.getClientForCall()
  512. if err != nil {
  513. return err
  514. }
  515. _, err = client.SetSystemProxyEnabled(context.Background(), &daemon.SetSystemProxyEnabledRequest{
  516. Enabled: isEnabled,
  517. })
  518. return err
  519. }
  520. func (c *CommandClient) GetDeprecatedNotes() (DeprecatedNoteIterator, error) {
  521. client, err := c.getClientForCall()
  522. if err != nil {
  523. return nil, err
  524. }
  525. warnings, err := client.GetDeprecatedWarnings(context.Background(), &emptypb.Empty{})
  526. if err != nil {
  527. return nil, err
  528. }
  529. var notes []*DeprecatedNote
  530. for _, warning := range warnings.Warnings {
  531. notes = append(notes, &DeprecatedNote{
  532. Description: warning.Message,
  533. MigrationLink: warning.MigrationLink,
  534. })
  535. }
  536. return newIterator(notes), nil
  537. }
  538. func (c *CommandClient) GetStartedAt() (int64, error) {
  539. client, err := c.getClientForCall()
  540. if err != nil {
  541. return 0, err
  542. }
  543. startedAt, err := client.GetStartedAt(context.Background(), &emptypb.Empty{})
  544. if err != nil {
  545. return 0, err
  546. }
  547. return startedAt.StartedAt, nil
  548. }
  549. func (c *CommandClient) SetGroupExpand(groupTag string, isExpand bool) error {
  550. client, err := c.getClientForCall()
  551. if err != nil {
  552. return err
  553. }
  554. _, err = client.SetGroupExpand(context.Background(), &daemon.SetGroupExpandRequest{
  555. GroupTag: groupTag,
  556. IsExpand: isExpand,
  557. })
  558. return err
  559. }