started_service.go 24 KB


  1. package daemon
  2. import (
  3. "context"
  4. "os"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "github.com/sagernet/sing-box/adapter"
  9. "github.com/sagernet/sing-box/common/conntrack"
  10. "github.com/sagernet/sing-box/common/urltest"
  11. "github.com/sagernet/sing-box/experimental/clashapi"
  12. "github.com/sagernet/sing-box/experimental/clashapi/trafficontrol"
  13. "github.com/sagernet/sing-box/experimental/deprecated"
  14. "github.com/sagernet/sing-box/log"
  15. "github.com/sagernet/sing-box/protocol/group"
  16. "github.com/sagernet/sing/common"
  17. "github.com/sagernet/sing/common/batch"
  18. E "github.com/sagernet/sing/common/exceptions"
  19. "github.com/sagernet/sing/common/memory"
  20. "github.com/sagernet/sing/common/observable"
  21. "github.com/sagernet/sing/common/x/list"
  22. "github.com/sagernet/sing/service"
  23. "github.com/gofrs/uuid/v5"
  24. "google.golang.org/grpc"
  25. "google.golang.org/protobuf/types/known/emptypb"
  26. )
  27. var _ StartedServiceServer = (*StartedService)(nil)
  28. type StartedService struct {
  29. ctx context.Context
  30. // platform adapter.PlatformInterface
  31. handler PlatformHandler
  32. debug bool
  33. logMaxLines int
  34. // workingDirectory string
  35. // tempDirectory string
  36. // userID int
  37. // groupID int
  38. // systemProxyEnabled bool
  39. serviceAccess sync.RWMutex
  40. serviceStatus *ServiceStatus
  41. serviceStatusSubscriber *observable.Subscriber[*ServiceStatus]
  42. serviceStatusObserver *observable.Observer[*ServiceStatus]
  43. logAccess sync.RWMutex
  44. logLines list.List[*log.Entry]
  45. logSubscriber *observable.Subscriber[*log.Entry]
  46. logObserver *observable.Observer[*log.Entry]
  47. instance *Instance
  48. urlTestSubscriber *observable.Subscriber[struct{}]
  49. urlTestObserver *observable.Observer[struct{}]
  50. urlTestHistoryStorage *urltest.HistoryStorage
  51. clashModeSubscriber *observable.Subscriber[struct{}]
  52. clashModeObserver *observable.Observer[struct{}]
  53. }
  54. type ServiceOptions struct {
  55. Context context.Context
  56. // Platform adapter.PlatformInterface
  57. Handler PlatformHandler
  58. Debug bool
  59. LogMaxLines int
  60. // WorkingDirectory string
  61. // TempDirectory string
  62. // UserID int
  63. // GroupID int
  64. // SystemProxyEnabled bool
  65. }
  66. func NewStartedService(options ServiceOptions) *StartedService {
  67. s := &StartedService{
  68. ctx: options.Context,
  69. // platform: options.Platform,
  70. handler: options.Handler,
  71. debug: options.Debug,
  72. logMaxLines: options.LogMaxLines,
  73. // workingDirectory: options.WorkingDirectory,
  74. // tempDirectory: options.TempDirectory,
  75. // userID: options.UserID,
  76. // groupID: options.GroupID,
  77. // systemProxyEnabled: options.SystemProxyEnabled,
  78. serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
  79. serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
  80. logSubscriber: observable.NewSubscriber[*log.Entry](128),
  81. urlTestSubscriber: observable.NewSubscriber[struct{}](1),
  82. urlTestHistoryStorage: urltest.NewHistoryStorage(),
  83. clashModeSubscriber: observable.NewSubscriber[struct{}](1),
  84. }
  85. s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
  86. s.logObserver = observable.NewObserver(s.logSubscriber, 64)
  87. s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
  88. s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
  89. return s
  90. }
  91. func (s *StartedService) resetLogs() {
  92. s.logAccess.Lock()
  93. s.logLines = list.List[*log.Entry]{}
  94. s.logAccess.Unlock()
  95. s.logSubscriber.Emit(nil)
  96. }
  97. func (s *StartedService) updateStatus(newStatus ServiceStatus_Type) {
  98. statusObject := &ServiceStatus{Status: newStatus}
  99. s.serviceStatusSubscriber.Emit(statusObject)
  100. s.serviceStatus = statusObject
  101. }
  102. func (s *StartedService) updateStatusError(err error) error {
  103. statusObject := &ServiceStatus{Status: ServiceStatus_FATAL, ErrorMessage: err.Error()}
  104. s.serviceStatusSubscriber.Emit(statusObject)
  105. s.serviceStatus = statusObject
  106. s.serviceAccess.Unlock()
  107. return err
  108. }
  109. func (s *StartedService) waitForStarted(ctx context.Context) error {
  110. s.serviceAccess.RLock()
  111. currentStatus := s.serviceStatus.Status
  112. s.serviceAccess.RUnlock()
  113. switch currentStatus {
  114. case ServiceStatus_STARTED:
  115. return nil
  116. case ServiceStatus_STARTING:
  117. default:
  118. return os.ErrInvalid
  119. }
  120. subscription, done, err := s.serviceStatusObserver.Subscribe()
  121. if err != nil {
  122. return err
  123. }
  124. defer s.serviceStatusObserver.UnSubscribe(subscription)
  125. for {
  126. select {
  127. case <-ctx.Done():
  128. return ctx.Err()
  129. case <-s.ctx.Done():
  130. return s.ctx.Err()
  131. case status := <-subscription:
  132. switch status.Status {
  133. case ServiceStatus_STARTED:
  134. return nil
  135. case ServiceStatus_FATAL:
  136. return E.New(status.ErrorMessage)
  137. case ServiceStatus_IDLE, ServiceStatus_STOPPING:
  138. return os.ErrInvalid
  139. }
  140. case <-done:
  141. return os.ErrClosed
  142. }
  143. }
  144. }
  145. func (s *StartedService) StartOrReloadService(profileContent string, options *OverrideOptions) error {
  146. s.serviceAccess.Lock()
  147. switch s.serviceStatus.Status {
  148. case ServiceStatus_IDLE, ServiceStatus_STARTED, ServiceStatus_STARTING:
  149. default:
  150. s.serviceAccess.Unlock()
  151. return os.ErrInvalid
  152. }
  153. oldInstance := s.instance
  154. if oldInstance != nil {
  155. s.updateStatus(ServiceStatus_STOPPING)
  156. s.serviceAccess.Unlock()
  157. _ = oldInstance.Close()
  158. s.serviceAccess.Lock()
  159. }
  160. s.updateStatus(ServiceStatus_STARTING)
  161. s.resetLogs()
  162. instance, err := s.newInstance(profileContent, options)
  163. if err != nil {
  164. return s.updateStatusError(err)
  165. }
  166. s.instance = instance
  167. instance.urlTestHistoryStorage.SetHook(s.urlTestSubscriber)
  168. if instance.clashServer != nil {
  169. instance.clashServer.SetModeUpdateHook(s.clashModeSubscriber)
  170. }
  171. s.serviceAccess.Unlock()
  172. err = instance.Start()
  173. s.serviceAccess.Lock()
  174. if s.serviceStatus.Status != ServiceStatus_STARTING {
  175. s.serviceAccess.Unlock()
  176. return nil
  177. }
  178. if err != nil {
  179. return s.updateStatusError(err)
  180. }
  181. s.updateStatus(ServiceStatus_STARTED)
  182. s.serviceAccess.Unlock()
  183. runtime.GC()
  184. return nil
  185. }
  186. func (s *StartedService) CloseService() error {
  187. s.serviceAccess.Lock()
  188. switch s.serviceStatus.Status {
  189. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  190. default:
  191. s.serviceAccess.Unlock()
  192. return os.ErrInvalid
  193. }
  194. s.updateStatus(ServiceStatus_STOPPING)
  195. if s.instance != nil {
  196. err := s.instance.Close()
  197. if err != nil {
  198. return s.updateStatusError(err)
  199. }
  200. }
  201. s.instance = nil
  202. s.updateStatus(ServiceStatus_IDLE)
  203. s.serviceAccess.Unlock()
  204. runtime.GC()
  205. return nil
  206. }
  207. func (s *StartedService) SetError(err error) {
  208. s.serviceAccess.Lock()
  209. s.updateStatusError(err)
  210. s.WriteMessage(log.LevelError, err.Error())
  211. }
  212. func (s *StartedService) StopService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  213. err := s.handler.ServiceStop()
  214. if err != nil {
  215. return nil, err
  216. }
  217. return &emptypb.Empty{}, nil
  218. }
  219. func (s *StartedService) ReloadService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  220. err := s.handler.ServiceReload()
  221. if err != nil {
  222. return nil, err
  223. }
  224. return &emptypb.Empty{}, nil
  225. }
  226. func (s *StartedService) SubscribeServiceStatus(empty *emptypb.Empty, server grpc.ServerStreamingServer[ServiceStatus]) error {
  227. subscription, done, err := s.serviceStatusObserver.Subscribe()
  228. if err != nil {
  229. return err
  230. }
  231. defer s.serviceStatusObserver.UnSubscribe(subscription)
  232. err = server.Send(s.serviceStatus)
  233. if err != nil {
  234. return err
  235. }
  236. for {
  237. select {
  238. case <-s.ctx.Done():
  239. return s.ctx.Err()
  240. case <-server.Context().Done():
  241. return server.Context().Err()
  242. case newStatus := <-subscription:
  243. err = server.Send(newStatus)
  244. if err != nil {
  245. return err
  246. }
  247. case <-done:
  248. return nil
  249. }
  250. }
  251. }
  252. func (s *StartedService) SubscribeLog(empty *emptypb.Empty, server grpc.ServerStreamingServer[Log]) error {
  253. var savedLines []*log.Entry
  254. s.logAccess.Lock()
  255. savedLines = make([]*log.Entry, 0, s.logLines.Len())
  256. for element := s.logLines.Front(); element != nil; element = element.Next() {
  257. savedLines = append(savedLines, element.Value)
  258. }
  259. subscription, done, err := s.logObserver.Subscribe()
  260. s.logAccess.Unlock()
  261. if err != nil {
  262. return err
  263. }
  264. defer s.logObserver.UnSubscribe(subscription)
  265. err = server.Send(&Log{
  266. Messages: common.Map(savedLines, func(it *log.Entry) *Log_Message {
  267. return &Log_Message{
  268. Level: LogLevel(it.Level),
  269. Message: it.Message,
  270. }
  271. }),
  272. Reset_: true,
  273. })
  274. if err != nil {
  275. return err
  276. }
  277. for {
  278. select {
  279. case <-s.ctx.Done():
  280. return s.ctx.Err()
  281. case <-server.Context().Done():
  282. return server.Context().Err()
  283. case message := <-subscription:
  284. var rawMessage Log
  285. if message == nil {
  286. rawMessage.Reset_ = true
  287. } else {
  288. rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
  289. Level: LogLevel(message.Level),
  290. Message: message.Message,
  291. })
  292. }
  293. fetch:
  294. for {
  295. select {
  296. case message = <-subscription:
  297. if message == nil {
  298. rawMessage.Messages = nil
  299. rawMessage.Reset_ = true
  300. } else {
  301. rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
  302. Level: LogLevel(message.Level),
  303. Message: message.Message,
  304. })
  305. }
  306. default:
  307. break fetch
  308. }
  309. }
  310. err = server.Send(&rawMessage)
  311. if err != nil {
  312. return err
  313. }
  314. case <-done:
  315. return nil
  316. }
  317. }
  318. }
  319. func (s *StartedService) GetDefaultLogLevel(ctx context.Context, empty *emptypb.Empty) (*DefaultLogLevel, error) {
  320. s.serviceAccess.RLock()
  321. switch s.serviceStatus.Status {
  322. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  323. default:
  324. s.serviceAccess.RUnlock()
  325. return nil, os.ErrInvalid
  326. }
  327. logLevel := s.instance.instance.LogFactory().Level()
  328. s.serviceAccess.RUnlock()
  329. return &DefaultLogLevel{Level: LogLevel(logLevel)}, nil
  330. }
  331. func (s *StartedService) ClearLogs(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  332. s.resetLogs()
  333. return &emptypb.Empty{}, nil
  334. }
  335. func (s *StartedService) SubscribeStatus(request *SubscribeStatusRequest, server grpc.ServerStreamingServer[Status]) error {
  336. interval := time.Duration(request.Interval)
  337. if interval <= 0 {
  338. interval = time.Second // Default to 1 second
  339. }
  340. ticker := time.NewTicker(interval)
  341. defer ticker.Stop()
  342. status := s.readStatus()
  343. uploadTotal := status.UplinkTotal
  344. downloadTotal := status.DownlinkTotal
  345. for {
  346. err := server.Send(status)
  347. if err != nil {
  348. return err
  349. }
  350. select {
  351. case <-s.ctx.Done():
  352. return s.ctx.Err()
  353. case <-server.Context().Done():
  354. return server.Context().Err()
  355. case <-ticker.C:
  356. }
  357. status = s.readStatus()
  358. upload := status.UplinkTotal - uploadTotal
  359. download := status.DownlinkTotal - downloadTotal
  360. uploadTotal = status.UplinkTotal
  361. downloadTotal = status.DownlinkTotal
  362. status.Uplink = upload
  363. status.Downlink = download
  364. }
  365. }
  366. func (s *StartedService) readStatus() *Status {
  367. var status Status
  368. status.Memory = memory.Inuse()
  369. status.Goroutines = int32(runtime.NumGoroutine())
  370. status.ConnectionsOut = int32(conntrack.Count())
  371. s.serviceAccess.RLock()
  372. nowService := s.instance
  373. s.serviceAccess.RUnlock()
  374. if nowService != nil {
  375. if clashServer := nowService.clashServer; clashServer != nil {
  376. status.TrafficAvailable = true
  377. trafficManager := clashServer.(*clashapi.Server).TrafficManager()
  378. status.UplinkTotal, status.DownlinkTotal = trafficManager.Total()
  379. status.ConnectionsIn = int32(trafficManager.ConnectionsLen())
  380. }
  381. }
  382. return &status
  383. }
  384. func (s *StartedService) SubscribeGroups(empty *emptypb.Empty, server grpc.ServerStreamingServer[Groups]) error {
  385. err := s.waitForStarted(server.Context())
  386. if err != nil {
  387. return err
  388. }
  389. subscription, done, err := s.urlTestObserver.Subscribe()
  390. if err != nil {
  391. return err
  392. }
  393. defer s.urlTestObserver.UnSubscribe(subscription)
  394. for {
  395. s.serviceAccess.RLock()
  396. if s.serviceStatus.Status != ServiceStatus_STARTED {
  397. s.serviceAccess.RUnlock()
  398. return os.ErrInvalid
  399. }
  400. groups := s.readGroups()
  401. s.serviceAccess.RUnlock()
  402. err = server.Send(groups)
  403. if err != nil {
  404. return err
  405. }
  406. select {
  407. case <-subscription:
  408. case <-s.ctx.Done():
  409. return s.ctx.Err()
  410. case <-server.Context().Done():
  411. return server.Context().Err()
  412. case <-done:
  413. return nil
  414. }
  415. }
  416. }
  417. func (s *StartedService) readGroups() *Groups {
  418. historyStorage := s.instance.urlTestHistoryStorage
  419. boxService := s.instance
  420. outbounds := boxService.instance.Outbound().Outbounds()
  421. var iGroups []adapter.OutboundGroup
  422. for _, it := range outbounds {
  423. if group, isGroup := it.(adapter.OutboundGroup); isGroup {
  424. iGroups = append(iGroups, group)
  425. }
  426. }
  427. var gs Groups
  428. for _, iGroup := range iGroups {
  429. var g Group
  430. g.Tag = iGroup.Tag()
  431. g.Type = iGroup.Type()
  432. _, g.Selectable = iGroup.(*group.Selector)
  433. g.Selected = iGroup.Now()
  434. if boxService.cacheFile != nil {
  435. if isExpand, loaded := boxService.cacheFile.LoadGroupExpand(g.Tag); loaded {
  436. g.IsExpand = isExpand
  437. }
  438. }
  439. for _, itemTag := range iGroup.All() {
  440. itemOutbound, isLoaded := boxService.instance.Outbound().Outbound(itemTag)
  441. if !isLoaded {
  442. continue
  443. }
  444. var item GroupItem
  445. item.Tag = itemTag
  446. item.Type = itemOutbound.Type()
  447. if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(itemOutbound)); history != nil {
  448. item.UrlTestTime = history.Time.Unix()
  449. item.UrlTestDelay = int32(history.Delay)
  450. }
  451. g.Items = append(g.Items, &item)
  452. }
  453. if len(g.Items) < 2 {
  454. continue
  455. }
  456. gs.Group = append(gs.Group, &g)
  457. }
  458. return &gs
  459. }
  460. func (s *StartedService) GetClashModeStatus(ctx context.Context, empty *emptypb.Empty) (*ClashModeStatus, error) {
  461. s.serviceAccess.RLock()
  462. if s.serviceStatus.Status != ServiceStatus_STARTED {
  463. s.serviceAccess.RUnlock()
  464. return nil, os.ErrInvalid
  465. }
  466. clashServer := s.instance.clashServer
  467. s.serviceAccess.RUnlock()
  468. if clashServer == nil {
  469. return nil, os.ErrInvalid
  470. }
  471. return &ClashModeStatus{
  472. ModeList: clashServer.ModeList(),
  473. CurrentMode: clashServer.Mode(),
  474. }, nil
  475. }
  476. func (s *StartedService) SubscribeClashMode(empty *emptypb.Empty, server grpc.ServerStreamingServer[ClashMode]) error {
  477. err := s.waitForStarted(server.Context())
  478. if err != nil {
  479. return err
  480. }
  481. subscription, done, err := s.clashModeObserver.Subscribe()
  482. if err != nil {
  483. return err
  484. }
  485. defer s.clashModeObserver.UnSubscribe(subscription)
  486. for {
  487. s.serviceAccess.RLock()
  488. if s.serviceStatus.Status != ServiceStatus_STARTED {
  489. s.serviceAccess.RUnlock()
  490. return os.ErrInvalid
  491. }
  492. message := &ClashMode{Mode: s.instance.clashServer.Mode()}
  493. s.serviceAccess.RUnlock()
  494. err = server.Send(message)
  495. if err != nil {
  496. return err
  497. }
  498. select {
  499. case <-subscription:
  500. case <-s.ctx.Done():
  501. return s.ctx.Err()
  502. case <-server.Context().Done():
  503. return server.Context().Err()
  504. case <-done:
  505. return nil
  506. }
  507. }
  508. }
  509. func (s *StartedService) SetClashMode(ctx context.Context, request *ClashMode) (*emptypb.Empty, error) {
  510. s.serviceAccess.RLock()
  511. if s.serviceStatus.Status != ServiceStatus_STARTED {
  512. s.serviceAccess.RUnlock()
  513. return nil, os.ErrInvalid
  514. }
  515. clashServer := s.instance.clashServer
  516. s.serviceAccess.RUnlock()
  517. clashServer.(*clashapi.Server).SetMode(request.Mode)
  518. return &emptypb.Empty{}, nil
  519. }
  520. func (s *StartedService) URLTest(ctx context.Context, request *URLTestRequest) (*emptypb.Empty, error) {
  521. s.serviceAccess.RLock()
  522. if s.serviceStatus.Status != ServiceStatus_STARTED {
  523. s.serviceAccess.RUnlock()
  524. return nil, os.ErrInvalid
  525. }
  526. boxService := s.instance
  527. s.serviceAccess.RUnlock()
  528. groupTag := request.OutboundTag
  529. abstractOutboundGroup, isLoaded := boxService.instance.Outbound().Outbound(groupTag)
  530. if !isLoaded {
  531. return nil, E.New("outbound group not found: ", groupTag)
  532. }
  533. outboundGroup, isOutboundGroup := abstractOutboundGroup.(adapter.OutboundGroup)
  534. if !isOutboundGroup {
  535. return nil, E.New("outbound is not a group: ", groupTag)
  536. }
  537. urlTest, isURLTest := abstractOutboundGroup.(*group.URLTest)
  538. if isURLTest {
  539. go urlTest.CheckOutbounds()
  540. } else {
  541. historyStorage := boxService.urlTestHistoryStorage
  542. outbounds := common.Filter(common.Map(outboundGroup.All(), func(it string) adapter.Outbound {
  543. itOutbound, _ := boxService.instance.Outbound().Outbound(it)
  544. return itOutbound
  545. }), func(it adapter.Outbound) bool {
  546. if it == nil {
  547. return false
  548. }
  549. _, isGroup := it.(adapter.OutboundGroup)
  550. if isGroup {
  551. return false
  552. }
  553. return true
  554. })
  555. b, _ := batch.New(boxService.ctx, batch.WithConcurrencyNum[any](10))
  556. for _, detour := range outbounds {
  557. outboundToTest := detour
  558. outboundTag := outboundToTest.Tag()
  559. b.Go(outboundTag, func() (any, error) {
  560. t, err := urltest.URLTest(boxService.ctx, "", outboundToTest)
  561. if err != nil {
  562. historyStorage.DeleteURLTestHistory(outboundTag)
  563. } else {
  564. historyStorage.StoreURLTestHistory(outboundTag, &adapter.URLTestHistory{
  565. Time: time.Now(),
  566. Delay: t,
  567. })
  568. }
  569. return nil, nil
  570. })
  571. }
  572. }
  573. return &emptypb.Empty{}, nil
  574. }
  575. func (s *StartedService) SelectOutbound(ctx context.Context, request *SelectOutboundRequest) (*emptypb.Empty, error) {
  576. s.serviceAccess.RLock()
  577. switch s.serviceStatus.Status {
  578. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  579. default:
  580. s.serviceAccess.RUnlock()
  581. return nil, os.ErrInvalid
  582. }
  583. boxService := s.instance.instance
  584. s.serviceAccess.RUnlock()
  585. outboundGroup, isLoaded := boxService.Outbound().Outbound(request.GroupTag)
  586. if !isLoaded {
  587. return nil, E.New("selector not found: ", request.GroupTag)
  588. }
  589. selector, isSelector := outboundGroup.(*group.Selector)
  590. if !isSelector {
  591. return nil, E.New("outbound is not a selector: ", request.GroupTag)
  592. }
  593. if !selector.SelectOutbound(request.OutboundTag) {
  594. return nil, E.New("outbound not found in selector: ", request.OutboundTag)
  595. }
  596. s.urlTestObserver.Emit(struct{}{})
  597. return &emptypb.Empty{}, nil
  598. }
  599. func (s *StartedService) SetGroupExpand(ctx context.Context, request *SetGroupExpandRequest) (*emptypb.Empty, error) {
  600. s.serviceAccess.RLock()
  601. switch s.serviceStatus.Status {
  602. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  603. default:
  604. s.serviceAccess.RUnlock()
  605. return nil, os.ErrInvalid
  606. }
  607. boxService := s.instance
  608. s.serviceAccess.RUnlock()
  609. if boxService.cacheFile != nil {
  610. err := boxService.cacheFile.StoreGroupExpand(request.GroupTag, request.IsExpand)
  611. if err != nil {
  612. return nil, err
  613. }
  614. }
  615. return &emptypb.Empty{}, nil
  616. }
  617. func (s *StartedService) GetSystemProxyStatus(ctx context.Context, empty *emptypb.Empty) (*SystemProxyStatus, error) {
  618. return s.handler.SystemProxyStatus()
  619. }
  620. func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *SetSystemProxyEnabledRequest) (*emptypb.Empty, error) {
  621. err := s.handler.SetSystemProxyEnabled(request.Enabled)
  622. if err != nil {
  623. return nil, err
  624. }
  625. return nil, err
  626. }
  627. func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[Connections]) error {
  628. err := s.waitForStarted(server.Context())
  629. if err != nil {
  630. return err
  631. }
  632. s.serviceAccess.RLock()
  633. boxService := s.instance
  634. s.serviceAccess.RUnlock()
  635. ticker := time.NewTicker(time.Duration(request.Interval))
  636. defer ticker.Stop()
  637. trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
  638. var (
  639. connections = make(map[uuid.UUID]*Connection)
  640. outConnections []*Connection
  641. )
  642. for {
  643. outConnections = outConnections[:0]
  644. for _, connection := range trafficManager.Connections() {
  645. outConnections = append(outConnections, newConnection(connections, connection, false))
  646. }
  647. for _, connection := range trafficManager.ClosedConnections() {
  648. outConnections = append(outConnections, newConnection(connections, connection, true))
  649. }
  650. err := server.Send(&Connections{Connections: outConnections})
  651. if err != nil {
  652. return err
  653. }
  654. select {
  655. case <-s.ctx.Done():
  656. return s.ctx.Err()
  657. case <-server.Context().Done():
  658. return server.Context().Err()
  659. case <-ticker.C:
  660. }
  661. }
  662. }
  663. func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol.TrackerMetadata, isClosed bool) *Connection {
  664. if oldConnection, loaded := connections[metadata.ID]; loaded {
  665. if isClosed {
  666. if oldConnection.ClosedAt == 0 {
  667. oldConnection.Uplink = 0
  668. oldConnection.Downlink = 0
  669. oldConnection.ClosedAt = metadata.ClosedAt.UnixMilli()
  670. }
  671. return oldConnection
  672. }
  673. lastUplink := oldConnection.UplinkTotal
  674. lastDownlink := oldConnection.DownlinkTotal
  675. uplinkTotal := metadata.Upload.Load()
  676. downlinkTotal := metadata.Download.Load()
  677. oldConnection.Uplink = uplinkTotal - lastUplink
  678. oldConnection.Downlink = downlinkTotal - lastDownlink
  679. oldConnection.UplinkTotal = uplinkTotal
  680. oldConnection.DownlinkTotal = downlinkTotal
  681. return oldConnection
  682. }
  683. var rule string
  684. if metadata.Rule != nil {
  685. rule = metadata.Rule.String()
  686. }
  687. uplinkTotal := metadata.Upload.Load()
  688. downlinkTotal := metadata.Download.Load()
  689. uplink := uplinkTotal
  690. downlink := downlinkTotal
  691. var closedAt int64
  692. if !metadata.ClosedAt.IsZero() {
  693. closedAt = metadata.ClosedAt.UnixMilli()
  694. uplink = 0
  695. downlink = 0
  696. }
  697. connection := &Connection{
  698. Id: metadata.ID.String(),
  699. Inbound: metadata.Metadata.Inbound,
  700. InboundType: metadata.Metadata.InboundType,
  701. IpVersion: int32(metadata.Metadata.IPVersion),
  702. Network: metadata.Metadata.Network,
  703. Source: metadata.Metadata.Source.String(),
  704. Destination: metadata.Metadata.Destination.String(),
  705. Domain: metadata.Metadata.Domain,
  706. Protocol: metadata.Metadata.Protocol,
  707. User: metadata.Metadata.User,
  708. FromOutbound: metadata.Metadata.Outbound,
  709. CreatedAt: metadata.CreatedAt.UnixMilli(),
  710. ClosedAt: closedAt,
  711. Uplink: uplink,
  712. Downlink: downlink,
  713. UplinkTotal: uplinkTotal,
  714. DownlinkTotal: downlinkTotal,
  715. Rule: rule,
  716. Outbound: metadata.Outbound,
  717. OutboundType: metadata.OutboundType,
  718. ChainList: metadata.Chain,
  719. }
  720. connections[metadata.ID] = connection
  721. return connection
  722. }
  723. func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {
  724. s.serviceAccess.RLock()
  725. switch s.serviceStatus.Status {
  726. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  727. default:
  728. s.serviceAccess.RUnlock()
  729. return nil, os.ErrInvalid
  730. }
  731. boxService := s.instance
  732. s.serviceAccess.RUnlock()
  733. targetConn := boxService.clashServer.(*clashapi.Server).TrafficManager().Connection(uuid.FromStringOrNil(request.Id))
  734. if targetConn != nil {
  735. targetConn.Close()
  736. }
  737. return &emptypb.Empty{}, nil
  738. }
  739. func (s *StartedService) CloseAllConnections(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  740. conntrack.Close()
  741. return &emptypb.Empty{}, nil
  742. }
  743. func (s *StartedService) GetDeprecatedWarnings(ctx context.Context, empty *emptypb.Empty) (*DeprecatedWarnings, error) {
  744. s.serviceAccess.RLock()
  745. if s.serviceStatus.Status != ServiceStatus_STARTED {
  746. s.serviceAccess.RUnlock()
  747. return nil, os.ErrInvalid
  748. }
  749. boxService := s.instance
  750. s.serviceAccess.RUnlock()
  751. notes := service.FromContext[deprecated.Manager](boxService.ctx).(*deprecatedManager).Get()
  752. return &DeprecatedWarnings{
  753. Warnings: common.Map(notes, func(it deprecated.Note) *DeprecatedWarning {
  754. return &DeprecatedWarning{
  755. Message: it.Message(),
  756. Impending: it.Impending(),
  757. MigrationLink: it.MigrationLink,
  758. }
  759. }),
  760. }, nil
  761. }
  762. func (s *StartedService) SubscribeHelperEvents(empty *emptypb.Empty, server grpc.ServerStreamingServer[HelperRequest]) error {
  763. return os.ErrInvalid
  764. }
  765. func (s *StartedService) SendHelperResponse(ctx context.Context, response *HelperResponse) (*emptypb.Empty, error) {
  766. return nil, os.ErrInvalid
  767. }
  768. func (s *StartedService) mustEmbedUnimplementedStartedServiceServer() {
  769. }
  770. func (s *StartedService) WriteMessage(level log.Level, message string) {
  771. item := &log.Entry{Level: level, Message: message}
  772. s.logAccess.Lock()
  773. s.logLines.PushBack(item)
  774. if s.logLines.Len() > s.logMaxLines {
  775. s.logLines.Remove(s.logLines.Front())
  776. }
  777. s.logAccess.Unlock()
  778. s.logSubscriber.Emit(item)
  779. if s.debug {
  780. s.handler.WriteDebugMessage(message)
  781. }
  782. }
  783. func (s *StartedService) Instance() *Instance {
  784. s.serviceAccess.RLock()
  785. defer s.serviceAccess.RUnlock()
  786. return s.instance
  787. }