command_client.go 11 KB


  1. package libbox
  2. import (
  3. "context"
  4. "net"
  5. "os"
  6. "path/filepath"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/sagernet/sing-box/daemon"
  11. "github.com/sagernet/sing/common"
  12. E "github.com/sagernet/sing/common/exceptions"
  13. "google.golang.org/grpc"
  14. "google.golang.org/grpc/credentials/insecure"
  15. "google.golang.org/grpc/metadata"
  16. "google.golang.org/protobuf/types/known/emptypb"
  17. )
  18. type CommandClient struct {
  19. handler CommandClientHandler
  20. grpcConn *grpc.ClientConn
  21. grpcClient daemon.StartedServiceClient
  22. options CommandClientOptions
  23. ctx context.Context
  24. cancel context.CancelFunc
  25. clientMutex sync.RWMutex
  26. }
  27. type CommandClientOptions struct {
  28. commands []int32
  29. StatusInterval int64
  30. }
  31. func (o *CommandClientOptions) AddCommand(command int32) {
  32. o.commands = append(o.commands, command)
  33. }
  34. type CommandClientHandler interface {
  35. Connected()
  36. Disconnected(message string)
  37. SetDefaultLogLevel(level int32)
  38. ClearLogs()
  39. WriteLogs(messageList LogIterator)
  40. WriteStatus(message *StatusMessage)
  41. WriteGroups(message OutboundGroupIterator)
  42. InitializeClashMode(modeList StringIterator, currentMode string)
  43. UpdateClashMode(newMode string)
  44. WriteConnections(message *Connections)
  45. }
  46. type LogEntry struct {
  47. Level int32
  48. Message string
  49. }
  50. type LogIterator interface {
  51. Len() int32
  52. HasNext() bool
  53. Next() *LogEntry
  54. }
  55. func NewStandaloneCommandClient() *CommandClient {
  56. return new(CommandClient)
  57. }
  58. func NewCommandClient(handler CommandClientHandler, options *CommandClientOptions) *CommandClient {
  59. return &CommandClient{
  60. handler: handler,
  61. options: common.PtrValueOrDefault(options),
  62. }
  63. }
  64. func unaryClientAuthInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  65. if sCommandServerSecret != "" {
  66. ctx = metadata.AppendToOutgoingContext(ctx, "x-command-secret", sCommandServerSecret)
  67. }
  68. return invoker(ctx, method, req, reply, cc, opts...)
  69. }
  70. func streamClientAuthInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  71. if sCommandServerSecret != "" {
  72. ctx = metadata.AppendToOutgoingContext(ctx, "x-command-secret", sCommandServerSecret)
  73. }
  74. return streamer(ctx, desc, cc, method, opts...)
  75. }
  76. func (c *CommandClient) grpcDial() (*grpc.ClientConn, error) {
  77. var target string
  78. if sCommandServerListenPort == 0 {
  79. target = "unix://" + filepath.Join(sBasePath, "command.sock")
  80. } else {
  81. target = net.JoinHostPort("127.0.0.1", strconv.Itoa(int(sCommandServerListenPort)))
  82. }
  83. var (
  84. conn *grpc.ClientConn
  85. err error
  86. )
  87. clientOptions := []grpc.DialOption{
  88. grpc.WithTransportCredentials(insecure.NewCredentials()),
  89. grpc.WithUnaryInterceptor(unaryClientAuthInterceptor),
  90. grpc.WithStreamInterceptor(streamClientAuthInterceptor),
  91. }
  92. for i := 0; i < 10; i++ {
  93. conn, err = grpc.NewClient(target, clientOptions...)
  94. if err == nil {
  95. return conn, nil
  96. }
  97. time.Sleep(time.Duration(100+i*50) * time.Millisecond)
  98. }
  99. return nil, err
  100. }
  101. func (c *CommandClient) Connect() error {
  102. c.clientMutex.Lock()
  103. common.Close(common.PtrOrNil(c.grpcConn))
  104. conn, err := c.grpcDial()
  105. if err != nil {
  106. c.clientMutex.Unlock()
  107. return err
  108. }
  109. c.grpcConn = conn
  110. c.grpcClient = daemon.NewStartedServiceClient(conn)
  111. c.ctx, c.cancel = context.WithCancel(context.Background())
  112. c.clientMutex.Unlock()
  113. c.handler.Connected()
  114. for _, command := range c.options.commands {
  115. switch command {
  116. case CommandLog:
  117. go c.handleLogStream()
  118. case CommandStatus:
  119. go c.handleStatusStream()
  120. case CommandGroup:
  121. go c.handleGroupStream()
  122. case CommandClashMode:
  123. go c.handleClashModeStream()
  124. case CommandConnections:
  125. go c.handleConnectionsStream()
  126. default:
  127. return E.New("unknown command: ", command)
  128. }
  129. }
  130. return nil
  131. }
  132. func (c *CommandClient) Disconnect() error {
  133. c.clientMutex.Lock()
  134. defer c.clientMutex.Unlock()
  135. if c.cancel != nil {
  136. c.cancel()
  137. }
  138. return common.Close(common.PtrOrNil(c.grpcConn))
  139. }
  140. func (c *CommandClient) getClientForCall() (daemon.StartedServiceClient, error) {
  141. c.clientMutex.RLock()
  142. if c.grpcClient != nil {
  143. defer c.clientMutex.RUnlock()
  144. return c.grpcClient, nil
  145. }
  146. c.clientMutex.RUnlock()
  147. c.clientMutex.Lock()
  148. defer c.clientMutex.Unlock()
  149. if c.grpcClient != nil {
  150. return c.grpcClient, nil
  151. }
  152. conn, err := c.grpcDial()
  153. if err != nil {
  154. return nil, err
  155. }
  156. c.grpcConn = conn
  157. c.grpcClient = daemon.NewStartedServiceClient(conn)
  158. if c.ctx == nil {
  159. c.ctx, c.cancel = context.WithCancel(context.Background())
  160. }
  161. return c.grpcClient, nil
  162. }
  163. func (c *CommandClient) getStreamContext() (daemon.StartedServiceClient, context.Context) {
  164. c.clientMutex.RLock()
  165. defer c.clientMutex.RUnlock()
  166. return c.grpcClient, c.ctx
  167. }
  168. func (c *CommandClient) handleLogStream() {
  169. client, ctx := c.getStreamContext()
  170. stream, err := client.SubscribeLog(ctx, &emptypb.Empty{})
  171. if err != nil {
  172. c.handler.Disconnected(err.Error())
  173. return
  174. }
  175. defaultLogLevel, err := client.GetDefaultLogLevel(ctx, &emptypb.Empty{})
  176. if err != nil {
  177. c.handler.Disconnected(err.Error())
  178. return
  179. }
  180. c.handler.SetDefaultLogLevel(int32(defaultLogLevel.Level))
  181. for {
  182. logMessage, err := stream.Recv()
  183. if err != nil {
  184. c.handler.Disconnected(err.Error())
  185. return
  186. }
  187. if logMessage.Reset_ {
  188. c.handler.ClearLogs()
  189. }
  190. var messages []*LogEntry
  191. for _, msg := range logMessage.Messages {
  192. messages = append(messages, &LogEntry{
  193. Level: int32(msg.Level),
  194. Message: msg.Message,
  195. })
  196. }
  197. c.handler.WriteLogs(newIterator(messages))
  198. }
  199. }
  200. func (c *CommandClient) handleStatusStream() {
  201. client, ctx := c.getStreamContext()
  202. interval := c.options.StatusInterval
  203. stream, err := client.SubscribeStatus(ctx, &daemon.SubscribeStatusRequest{
  204. Interval: interval,
  205. })
  206. if err != nil {
  207. c.handler.Disconnected(err.Error())
  208. return
  209. }
  210. for {
  211. status, err := stream.Recv()
  212. if err != nil {
  213. c.handler.Disconnected(err.Error())
  214. return
  215. }
  216. c.handler.WriteStatus(StatusMessageFromGRPC(status))
  217. }
  218. }
  219. func (c *CommandClient) handleGroupStream() {
  220. client, ctx := c.getStreamContext()
  221. stream, err := client.SubscribeGroups(ctx, &emptypb.Empty{})
  222. if err != nil {
  223. c.handler.Disconnected(err.Error())
  224. return
  225. }
  226. for {
  227. groups, err := stream.Recv()
  228. if err != nil {
  229. c.handler.Disconnected(err.Error())
  230. return
  231. }
  232. c.handler.WriteGroups(OutboundGroupIteratorFromGRPC(groups))
  233. }
  234. }
  235. func (c *CommandClient) handleClashModeStream() {
  236. client, ctx := c.getStreamContext()
  237. modeStatus, err := client.GetClashModeStatus(ctx, &emptypb.Empty{})
  238. if err != nil {
  239. c.handler.Disconnected(err.Error())
  240. return
  241. }
  242. if sFixAndroidStack {
  243. go func() {
  244. c.handler.InitializeClashMode(newIterator(modeStatus.ModeList), modeStatus.CurrentMode)
  245. if len(modeStatus.ModeList) == 0 {
  246. c.handler.Disconnected(os.ErrInvalid.Error())
  247. }
  248. }()
  249. } else {
  250. c.handler.InitializeClashMode(newIterator(modeStatus.ModeList), modeStatus.CurrentMode)
  251. if len(modeStatus.ModeList) == 0 {
  252. c.handler.Disconnected(os.ErrInvalid.Error())
  253. return
  254. }
  255. }
  256. if len(modeStatus.ModeList) == 0 {
  257. return
  258. }
  259. stream, err := client.SubscribeClashMode(ctx, &emptypb.Empty{})
  260. if err != nil {
  261. c.handler.Disconnected(err.Error())
  262. return
  263. }
  264. for {
  265. mode, err := stream.Recv()
  266. if err != nil {
  267. c.handler.Disconnected(err.Error())
  268. return
  269. }
  270. c.handler.UpdateClashMode(mode.Mode)
  271. }
  272. }
  273. func (c *CommandClient) handleConnectionsStream() {
  274. client, ctx := c.getStreamContext()
  275. interval := c.options.StatusInterval
  276. stream, err := client.SubscribeConnections(ctx, &daemon.SubscribeConnectionsRequest{
  277. Interval: interval,
  278. })
  279. if err != nil {
  280. c.handler.Disconnected(err.Error())
  281. return
  282. }
  283. var connections Connections
  284. for {
  285. conns, err := stream.Recv()
  286. if err != nil {
  287. c.handler.Disconnected(err.Error())
  288. return
  289. }
  290. connections.input = ConnectionsFromGRPC(conns)
  291. c.handler.WriteConnections(&connections)
  292. }
  293. }
  294. func (c *CommandClient) SelectOutbound(groupTag string, outboundTag string) error {
  295. client, err := c.getClientForCall()
  296. if err != nil {
  297. return err
  298. }
  299. _, err = client.SelectOutbound(context.Background(), &daemon.SelectOutboundRequest{
  300. GroupTag: groupTag,
  301. OutboundTag: outboundTag,
  302. })
  303. return err
  304. }
  305. func (c *CommandClient) URLTest(groupTag string) error {
  306. client, err := c.getClientForCall()
  307. if err != nil {
  308. return err
  309. }
  310. _, err = client.URLTest(context.Background(), &daemon.URLTestRequest{
  311. OutboundTag: groupTag,
  312. })
  313. return err
  314. }
  315. func (c *CommandClient) SetClashMode(newMode string) error {
  316. client, err := c.getClientForCall()
  317. if err != nil {
  318. return err
  319. }
  320. _, err = client.SetClashMode(context.Background(), &daemon.ClashMode{
  321. Mode: newMode,
  322. })
  323. return err
  324. }
  325. func (c *CommandClient) CloseConnection(connId string) error {
  326. client, err := c.getClientForCall()
  327. if err != nil {
  328. return err
  329. }
  330. _, err = client.CloseConnection(context.Background(), &daemon.CloseConnectionRequest{
  331. Id: connId,
  332. })
  333. return err
  334. }
  335. func (c *CommandClient) CloseConnections() error {
  336. client, err := c.getClientForCall()
  337. if err != nil {
  338. return err
  339. }
  340. _, err = client.CloseAllConnections(context.Background(), &emptypb.Empty{})
  341. return err
  342. }
  343. func (c *CommandClient) ServiceReload() error {
  344. client, err := c.getClientForCall()
  345. if err != nil {
  346. return err
  347. }
  348. _, err = client.ReloadService(context.Background(), &emptypb.Empty{})
  349. return err
  350. }
  351. func (c *CommandClient) ServiceClose() error {
  352. client, err := c.getClientForCall()
  353. if err != nil {
  354. return err
  355. }
  356. _, err = client.StopService(context.Background(), &emptypb.Empty{})
  357. return err
  358. }
  359. func (c *CommandClient) GetSystemProxyStatus() (*SystemProxyStatus, error) {
  360. client, err := c.getClientForCall()
  361. if err != nil {
  362. return nil, err
  363. }
  364. status, err := client.GetSystemProxyStatus(context.Background(), &emptypb.Empty{})
  365. if err != nil {
  366. return nil, err
  367. }
  368. return SystemProxyStatusFromGRPC(status), nil
  369. }
  370. func (c *CommandClient) SetSystemProxyEnabled(isEnabled bool) error {
  371. client, err := c.getClientForCall()
  372. if err != nil {
  373. return err
  374. }
  375. _, err = client.SetSystemProxyEnabled(context.Background(), &daemon.SetSystemProxyEnabledRequest{
  376. Enabled: isEnabled,
  377. })
  378. return err
  379. }
  380. func (c *CommandClient) GetDeprecatedNotes() (DeprecatedNoteIterator, error) {
  381. client, err := c.getClientForCall()
  382. if err != nil {
  383. return nil, err
  384. }
  385. warnings, err := client.GetDeprecatedWarnings(context.Background(), &emptypb.Empty{})
  386. if err != nil {
  387. return nil, err
  388. }
  389. var notes []*DeprecatedNote
  390. for _, warning := range warnings.Warnings {
  391. notes = append(notes, &DeprecatedNote{
  392. Description: warning.Message,
  393. MigrationLink: warning.MigrationLink,
  394. })
  395. }
  396. return newIterator(notes), nil
  397. }
  398. func (c *CommandClient) SetGroupExpand(groupTag string, isExpand bool) error {
  399. client, err := c.getClientForCall()
  400. if err != nil {
  401. return err
  402. }
  403. _, err = client.SetGroupExpand(context.Background(), &daemon.SetGroupExpandRequest{
  404. GroupTag: groupTag,
  405. IsExpand: isExpand,
  406. })
  407. return err
  408. }