started_service.go 23 KB


  1. package daemon
  2. import (
  3. "context"
  4. "os"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "github.com/sagernet/sing-box/adapter"
  9. "github.com/sagernet/sing-box/common/conntrack"
  10. "github.com/sagernet/sing-box/common/urltest"
  11. "github.com/sagernet/sing-box/experimental/clashapi"
  12. "github.com/sagernet/sing-box/experimental/clashapi/trafficontrol"
  13. "github.com/sagernet/sing-box/experimental/deprecated"
  14. "github.com/sagernet/sing-box/log"
  15. "github.com/sagernet/sing-box/protocol/group"
  16. "github.com/sagernet/sing/common"
  17. "github.com/sagernet/sing/common/batch"
  18. E "github.com/sagernet/sing/common/exceptions"
  19. "github.com/sagernet/sing/common/memory"
  20. "github.com/sagernet/sing/common/observable"
  21. "github.com/sagernet/sing/common/x/list"
  22. "github.com/sagernet/sing/service"
  23. "github.com/gofrs/uuid/v5"
  24. "google.golang.org/grpc"
  25. "google.golang.org/protobuf/types/known/emptypb"
  26. )
  27. var _ StartedServiceServer = (*StartedService)(nil)
  28. type StartedService struct {
  29. ctx context.Context
  30. // platform adapter.PlatformInterface
  31. handler PlatformHandler
  32. debug bool
  33. logMaxLines int
  34. // workingDirectory string
  35. // tempDirectory string
  36. // userID int
  37. // groupID int
  38. // systemProxyEnabled bool
  39. serviceAccess sync.RWMutex
  40. serviceStatus *ServiceStatus
  41. serviceStatusSubscriber *observable.Subscriber[*ServiceStatus]
  42. serviceStatusObserver *observable.Observer[*ServiceStatus]
  43. logAccess sync.RWMutex
  44. logLines list.List[*log.Entry]
  45. logSubscriber *observable.Subscriber[*log.Entry]
  46. logObserver *observable.Observer[*log.Entry]
  47. instance *Instance
  48. urlTestSubscriber *observable.Subscriber[struct{}]
  49. urlTestObserver *observable.Observer[struct{}]
  50. urlTestHistoryStorage *urltest.HistoryStorage
  51. clashModeSubscriber *observable.Subscriber[struct{}]
  52. clashModeObserver *observable.Observer[struct{}]
  53. }
  54. type ServiceOptions struct {
  55. Context context.Context
  56. // Platform adapter.PlatformInterface
  57. Handler PlatformHandler
  58. Debug bool
  59. LogMaxLines int
  60. // WorkingDirectory string
  61. // TempDirectory string
  62. // UserID int
  63. // GroupID int
  64. // SystemProxyEnabled bool
  65. }
  66. func NewStartedService(options ServiceOptions) *StartedService {
  67. s := &StartedService{
  68. ctx: options.Context,
  69. // platform: options.Platform,
  70. handler: options.Handler,
  71. debug: options.Debug,
  72. logMaxLines: options.LogMaxLines,
  73. // workingDirectory: options.WorkingDirectory,
  74. // tempDirectory: options.TempDirectory,
  75. // userID: options.UserID,
  76. // groupID: options.GroupID,
  77. // systemProxyEnabled: options.SystemProxyEnabled,
  78. serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
  79. serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
  80. logSubscriber: observable.NewSubscriber[*log.Entry](128),
  81. urlTestSubscriber: observable.NewSubscriber[struct{}](1),
  82. urlTestHistoryStorage: urltest.NewHistoryStorage(),
  83. clashModeSubscriber: observable.NewSubscriber[struct{}](1),
  84. }
  85. s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
  86. s.logObserver = observable.NewObserver(s.logSubscriber, 64)
  87. s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
  88. s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
  89. return s
  90. }
  91. func (s *StartedService) resetLogs() {
  92. s.logAccess.Lock()
  93. s.logLines = list.List[*log.Entry]{}
  94. s.logAccess.Unlock()
  95. s.logSubscriber.Emit(nil)
  96. }
  97. func (s *StartedService) updateStatus(newStatus ServiceStatus_Type) {
  98. statusObject := &ServiceStatus{Status: newStatus}
  99. s.serviceStatusSubscriber.Emit(statusObject)
  100. s.serviceStatus = statusObject
  101. }
  102. func (s *StartedService) updateStatusError(err error) error {
  103. statusObject := &ServiceStatus{Status: ServiceStatus_FATAL, ErrorMessage: err.Error()}
  104. s.serviceStatusSubscriber.Emit(statusObject)
  105. s.serviceStatus = statusObject
  106. s.serviceAccess.Unlock()
  107. return err
  108. }
  109. func (s *StartedService) waitForStarted(ctx context.Context) error {
  110. s.serviceAccess.RLock()
  111. currentStatus := s.serviceStatus.Status
  112. s.serviceAccess.RUnlock()
  113. switch currentStatus {
  114. case ServiceStatus_STARTED:
  115. return nil
  116. case ServiceStatus_STARTING:
  117. default:
  118. return os.ErrInvalid
  119. }
  120. subscription, done, err := s.serviceStatusObserver.Subscribe()
  121. if err != nil {
  122. return err
  123. }
  124. defer s.serviceStatusObserver.UnSubscribe(subscription)
  125. for {
  126. select {
  127. case <-ctx.Done():
  128. return ctx.Err()
  129. case <-s.ctx.Done():
  130. return s.ctx.Err()
  131. case status := <-subscription:
  132. switch status.Status {
  133. case ServiceStatus_STARTED:
  134. return nil
  135. case ServiceStatus_FATAL:
  136. return E.New(status.ErrorMessage)
  137. case ServiceStatus_IDLE, ServiceStatus_STOPPING:
  138. return os.ErrInvalid
  139. }
  140. case <-done:
  141. return os.ErrClosed
  142. }
  143. }
  144. }
  145. func (s *StartedService) StartOrReloadService(profileContent string, options *OverrideOptions) error {
  146. s.serviceAccess.Lock()
  147. switch s.serviceStatus.Status {
  148. case ServiceStatus_IDLE, ServiceStatus_STARTED, ServiceStatus_STARTING:
  149. default:
  150. s.serviceAccess.Unlock()
  151. return os.ErrInvalid
  152. }
  153. oldInstance := s.instance
  154. if oldInstance != nil {
  155. s.updateStatus(ServiceStatus_STOPPING)
  156. s.serviceAccess.Unlock()
  157. _ = oldInstance.Close()
  158. s.serviceAccess.Lock()
  159. }
  160. s.updateStatus(ServiceStatus_STARTING)
  161. s.resetLogs()
  162. instance, err := s.newInstance(profileContent, options)
  163. if err != nil {
  164. return s.updateStatusError(err)
  165. }
  166. s.instance = instance
  167. instance.urlTestHistoryStorage.SetHook(s.urlTestSubscriber)
  168. if instance.clashServer != nil {
  169. instance.clashServer.SetModeUpdateHook(s.clashModeSubscriber)
  170. }
  171. s.serviceAccess.Unlock()
  172. err = instance.Start()
  173. s.serviceAccess.Lock()
  174. if s.serviceStatus.Status != ServiceStatus_STARTING {
  175. s.serviceAccess.Unlock()
  176. return nil
  177. }
  178. if err != nil {
  179. return s.updateStatusError(err)
  180. }
  181. s.updateStatus(ServiceStatus_STARTED)
  182. s.serviceAccess.Unlock()
  183. runtime.GC()
  184. return nil
  185. }
  186. func (s *StartedService) CloseService() error {
  187. s.serviceAccess.Lock()
  188. switch s.serviceStatus.Status {
  189. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  190. default:
  191. s.serviceAccess.Unlock()
  192. return os.ErrInvalid
  193. }
  194. s.updateStatus(ServiceStatus_STOPPING)
  195. if s.instance != nil {
  196. err := s.instance.Close()
  197. if err != nil {
  198. return s.updateStatusError(err)
  199. }
  200. }
  201. s.instance = nil
  202. s.updateStatus(ServiceStatus_IDLE)
  203. s.serviceAccess.Unlock()
  204. runtime.GC()
  205. return nil
  206. }
  207. func (s *StartedService) SetError(err error) {
  208. s.serviceAccess.Lock()
  209. s.updateStatusError(err)
  210. s.WriteMessage(log.LevelError, err.Error())
  211. }
  212. func (s *StartedService) StopService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  213. err := s.handler.ServiceStop()
  214. if err != nil {
  215. return nil, err
  216. }
  217. return &emptypb.Empty{}, nil
  218. }
  219. func (s *StartedService) ReloadService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  220. err := s.handler.ServiceReload()
  221. if err != nil {
  222. return nil, err
  223. }
  224. return &emptypb.Empty{}, nil
  225. }
  226. func (s *StartedService) SubscribeServiceStatus(empty *emptypb.Empty, server grpc.ServerStreamingServer[ServiceStatus]) error {
  227. subscription, done, err := s.serviceStatusObserver.Subscribe()
  228. if err != nil {
  229. return err
  230. }
  231. defer s.serviceStatusObserver.UnSubscribe(subscription)
  232. err = server.Send(s.serviceStatus)
  233. if err != nil {
  234. return err
  235. }
  236. for {
  237. select {
  238. case <-s.ctx.Done():
  239. return s.ctx.Err()
  240. case <-server.Context().Done():
  241. return server.Context().Err()
  242. case newStatus := <-subscription:
  243. err = server.Send(newStatus)
  244. if err != nil {
  245. return err
  246. }
  247. case <-done:
  248. return nil
  249. }
  250. }
  251. }
  252. func (s *StartedService) SubscribeLog(empty *emptypb.Empty, server grpc.ServerStreamingServer[Log]) error {
  253. var savedLines []*log.Entry
  254. s.logAccess.Lock()
  255. savedLines = make([]*log.Entry, 0, s.logLines.Len())
  256. for element := s.logLines.Front(); element != nil; element = element.Next() {
  257. savedLines = append(savedLines, element.Value)
  258. }
  259. s.logAccess.Unlock()
  260. subscription, done, err := s.logObserver.Subscribe()
  261. if err != nil {
  262. return err
  263. }
  264. defer s.logObserver.UnSubscribe(subscription)
  265. err = server.Send(&Log{
  266. Messages: common.Map(savedLines, func(it *log.Entry) *Log_Message {
  267. return &Log_Message{
  268. Level: LogLevel(it.Level),
  269. Message: it.Message,
  270. }
  271. }),
  272. Reset_: true,
  273. })
  274. if err != nil {
  275. return err
  276. }
  277. for {
  278. select {
  279. case <-s.ctx.Done():
  280. return s.ctx.Err()
  281. case <-server.Context().Done():
  282. return server.Context().Err()
  283. case message := <-subscription:
  284. if message == nil {
  285. err = server.Send(&Log{Reset_: true})
  286. if err != nil {
  287. return err
  288. }
  289. continue
  290. }
  291. messages := []*Log_Message{{
  292. Level: LogLevel(message.Level),
  293. Message: message.Message,
  294. }}
  295. fetch:
  296. for {
  297. select {
  298. case message = <-subscription:
  299. messages = append(messages, &Log_Message{
  300. Level: LogLevel(message.Level),
  301. Message: message.Message,
  302. })
  303. default:
  304. break fetch
  305. }
  306. }
  307. err = server.Send(&Log{Messages: messages})
  308. if err != nil {
  309. return err
  310. }
  311. case <-done:
  312. return nil
  313. }
  314. }
  315. }
  316. func (s *StartedService) GetDefaultLogLevel(ctx context.Context, empty *emptypb.Empty) (*DefaultLogLevel, error) {
  317. s.serviceAccess.RLock()
  318. switch s.serviceStatus.Status {
  319. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  320. default:
  321. s.serviceAccess.RUnlock()
  322. return nil, os.ErrInvalid
  323. }
  324. logLevel := s.instance.instance.LogFactory().Level()
  325. s.serviceAccess.RUnlock()
  326. return &DefaultLogLevel{Level: LogLevel(logLevel)}, nil
  327. }
  328. func (s *StartedService) SubscribeStatus(request *SubscribeStatusRequest, server grpc.ServerStreamingServer[Status]) error {
  329. interval := time.Duration(request.Interval)
  330. if interval <= 0 {
  331. interval = time.Second // Default to 1 second
  332. }
  333. ticker := time.NewTicker(interval)
  334. defer ticker.Stop()
  335. status := s.readStatus()
  336. uploadTotal := status.UplinkTotal
  337. downloadTotal := status.DownlinkTotal
  338. for {
  339. err := server.Send(status)
  340. if err != nil {
  341. return err
  342. }
  343. select {
  344. case <-s.ctx.Done():
  345. return s.ctx.Err()
  346. case <-server.Context().Done():
  347. return server.Context().Err()
  348. case <-ticker.C:
  349. }
  350. status = s.readStatus()
  351. upload := status.UplinkTotal - uploadTotal
  352. download := status.DownlinkTotal - downloadTotal
  353. uploadTotal = status.UplinkTotal
  354. downloadTotal = status.DownlinkTotal
  355. status.Uplink = upload
  356. status.Downlink = download
  357. }
  358. }
  359. func (s *StartedService) readStatus() *Status {
  360. var status Status
  361. status.Memory = memory.Inuse()
  362. status.Goroutines = int32(runtime.NumGoroutine())
  363. status.ConnectionsOut = int32(conntrack.Count())
  364. s.serviceAccess.RLock()
  365. nowService := s.instance
  366. s.serviceAccess.RUnlock()
  367. if nowService != nil {
  368. if clashServer := nowService.clashServer; clashServer != nil {
  369. status.TrafficAvailable = true
  370. trafficManager := clashServer.(*clashapi.Server).TrafficManager()
  371. status.UplinkTotal, status.DownlinkTotal = trafficManager.Total()
  372. status.ConnectionsIn = int32(trafficManager.ConnectionsLen())
  373. }
  374. }
  375. return &status
  376. }
  377. func (s *StartedService) SubscribeGroups(empty *emptypb.Empty, server grpc.ServerStreamingServer[Groups]) error {
  378. err := s.waitForStarted(server.Context())
  379. if err != nil {
  380. return err
  381. }
  382. subscription, done, err := s.urlTestObserver.Subscribe()
  383. if err != nil {
  384. return err
  385. }
  386. defer s.urlTestObserver.UnSubscribe(subscription)
  387. for {
  388. s.serviceAccess.RLock()
  389. if s.serviceStatus.Status != ServiceStatus_STARTED {
  390. s.serviceAccess.RUnlock()
  391. return os.ErrInvalid
  392. }
  393. groups := s.readGroups()
  394. s.serviceAccess.RUnlock()
  395. err = server.Send(groups)
  396. if err != nil {
  397. return err
  398. }
  399. select {
  400. case <-subscription:
  401. case <-s.ctx.Done():
  402. return s.ctx.Err()
  403. case <-server.Context().Done():
  404. return server.Context().Err()
  405. case <-done:
  406. return nil
  407. }
  408. }
  409. }
  410. func (s *StartedService) readGroups() *Groups {
  411. historyStorage := s.instance.urlTestHistoryStorage
  412. boxService := s.instance
  413. outbounds := boxService.instance.Outbound().Outbounds()
  414. var iGroups []adapter.OutboundGroup
  415. for _, it := range outbounds {
  416. if group, isGroup := it.(adapter.OutboundGroup); isGroup {
  417. iGroups = append(iGroups, group)
  418. }
  419. }
  420. var gs Groups
  421. for _, iGroup := range iGroups {
  422. var g Group
  423. g.Tag = iGroup.Tag()
  424. g.Type = iGroup.Type()
  425. _, g.Selectable = iGroup.(*group.Selector)
  426. g.Selected = iGroup.Now()
  427. if boxService.cacheFile != nil {
  428. if isExpand, loaded := boxService.cacheFile.LoadGroupExpand(g.Tag); loaded {
  429. g.IsExpand = isExpand
  430. }
  431. }
  432. for _, itemTag := range iGroup.All() {
  433. itemOutbound, isLoaded := boxService.instance.Outbound().Outbound(itemTag)
  434. if !isLoaded {
  435. continue
  436. }
  437. var item GroupItem
  438. item.Tag = itemTag
  439. item.Type = itemOutbound.Type()
  440. if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(itemOutbound)); history != nil {
  441. item.UrlTestTime = history.Time.Unix()
  442. item.UrlTestDelay = int32(history.Delay)
  443. }
  444. g.Items = append(g.Items, &item)
  445. }
  446. if len(g.Items) < 2 {
  447. continue
  448. }
  449. gs.Group = append(gs.Group, &g)
  450. }
  451. return &gs
  452. }
  453. func (s *StartedService) GetClashModeStatus(ctx context.Context, empty *emptypb.Empty) (*ClashModeStatus, error) {
  454. s.serviceAccess.RLock()
  455. if s.serviceStatus.Status != ServiceStatus_STARTED {
  456. s.serviceAccess.RUnlock()
  457. return nil, os.ErrInvalid
  458. }
  459. clashServer := s.instance.clashServer
  460. s.serviceAccess.RUnlock()
  461. if clashServer == nil {
  462. return nil, os.ErrInvalid
  463. }
  464. return &ClashModeStatus{
  465. ModeList: clashServer.ModeList(),
  466. CurrentMode: clashServer.Mode(),
  467. }, nil
  468. }
  469. func (s *StartedService) SubscribeClashMode(empty *emptypb.Empty, server grpc.ServerStreamingServer[ClashMode]) error {
  470. err := s.waitForStarted(server.Context())
  471. if err != nil {
  472. return err
  473. }
  474. subscription, done, err := s.clashModeObserver.Subscribe()
  475. if err != nil {
  476. return err
  477. }
  478. defer s.clashModeObserver.UnSubscribe(subscription)
  479. for {
  480. s.serviceAccess.RLock()
  481. if s.serviceStatus.Status != ServiceStatus_STARTED {
  482. s.serviceAccess.RUnlock()
  483. return os.ErrInvalid
  484. }
  485. message := &ClashMode{Mode: s.instance.clashServer.Mode()}
  486. s.serviceAccess.RUnlock()
  487. err = server.Send(message)
  488. if err != nil {
  489. return err
  490. }
  491. select {
  492. case <-subscription:
  493. case <-s.ctx.Done():
  494. return s.ctx.Err()
  495. case <-server.Context().Done():
  496. return server.Context().Err()
  497. case <-done:
  498. return nil
  499. }
  500. }
  501. }
  502. func (s *StartedService) SetClashMode(ctx context.Context, request *ClashMode) (*emptypb.Empty, error) {
  503. s.serviceAccess.RLock()
  504. if s.serviceStatus.Status != ServiceStatus_STARTED {
  505. s.serviceAccess.RUnlock()
  506. return nil, os.ErrInvalid
  507. }
  508. clashServer := s.instance.clashServer
  509. s.serviceAccess.RUnlock()
  510. clashServer.(*clashapi.Server).SetMode(request.Mode)
  511. return &emptypb.Empty{}, nil
  512. }
  513. func (s *StartedService) URLTest(ctx context.Context, request *URLTestRequest) (*emptypb.Empty, error) {
  514. s.serviceAccess.RLock()
  515. if s.serviceStatus.Status != ServiceStatus_STARTED {
  516. s.serviceAccess.RUnlock()
  517. return nil, os.ErrInvalid
  518. }
  519. boxService := s.instance
  520. s.serviceAccess.RUnlock()
  521. groupTag := request.OutboundTag
  522. abstractOutboundGroup, isLoaded := boxService.instance.Outbound().Outbound(groupTag)
  523. if !isLoaded {
  524. return nil, E.New("outbound group not found: ", groupTag)
  525. }
  526. outboundGroup, isOutboundGroup := abstractOutboundGroup.(adapter.OutboundGroup)
  527. if !isOutboundGroup {
  528. return nil, E.New("outbound is not a group: ", groupTag)
  529. }
  530. urlTest, isURLTest := abstractOutboundGroup.(*group.URLTest)
  531. if isURLTest {
  532. go urlTest.CheckOutbounds()
  533. } else {
  534. historyStorage := boxService.urlTestHistoryStorage
  535. outbounds := common.Filter(common.Map(outboundGroup.All(), func(it string) adapter.Outbound {
  536. itOutbound, _ := boxService.instance.Outbound().Outbound(it)
  537. return itOutbound
  538. }), func(it adapter.Outbound) bool {
  539. if it == nil {
  540. return false
  541. }
  542. _, isGroup := it.(adapter.OutboundGroup)
  543. if isGroup {
  544. return false
  545. }
  546. return true
  547. })
  548. b, _ := batch.New(boxService.ctx, batch.WithConcurrencyNum[any](10))
  549. for _, detour := range outbounds {
  550. outboundToTest := detour
  551. outboundTag := outboundToTest.Tag()
  552. b.Go(outboundTag, func() (any, error) {
  553. t, err := urltest.URLTest(boxService.ctx, "", outboundToTest)
  554. if err != nil {
  555. historyStorage.DeleteURLTestHistory(outboundTag)
  556. } else {
  557. historyStorage.StoreURLTestHistory(outboundTag, &adapter.URLTestHistory{
  558. Time: time.Now(),
  559. Delay: t,
  560. })
  561. }
  562. return nil, nil
  563. })
  564. }
  565. }
  566. return &emptypb.Empty{}, nil
  567. }
  568. func (s *StartedService) SelectOutbound(ctx context.Context, request *SelectOutboundRequest) (*emptypb.Empty, error) {
  569. s.serviceAccess.RLock()
  570. switch s.serviceStatus.Status {
  571. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  572. default:
  573. s.serviceAccess.RUnlock()
  574. return nil, os.ErrInvalid
  575. }
  576. boxService := s.instance.instance
  577. s.serviceAccess.RUnlock()
  578. outboundGroup, isLoaded := boxService.Outbound().Outbound(request.GroupTag)
  579. if !isLoaded {
  580. return nil, E.New("selector not found: ", request.GroupTag)
  581. }
  582. selector, isSelector := outboundGroup.(*group.Selector)
  583. if !isSelector {
  584. return nil, E.New("outbound is not a selector: ", request.GroupTag)
  585. }
  586. if !selector.SelectOutbound(request.OutboundTag) {
  587. return nil, E.New("outbound not found in selector: ", request.OutboundTag)
  588. }
  589. return &emptypb.Empty{}, nil
  590. }
  591. func (s *StartedService) SetGroupExpand(ctx context.Context, request *SetGroupExpandRequest) (*emptypb.Empty, error) {
  592. s.serviceAccess.RLock()
  593. switch s.serviceStatus.Status {
  594. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  595. default:
  596. s.serviceAccess.RUnlock()
  597. return nil, os.ErrInvalid
  598. }
  599. boxService := s.instance
  600. s.serviceAccess.RUnlock()
  601. if boxService.cacheFile != nil {
  602. err := boxService.cacheFile.StoreGroupExpand(request.GroupTag, request.IsExpand)
  603. if err != nil {
  604. return nil, err
  605. }
  606. }
  607. return &emptypb.Empty{}, nil
  608. }
  609. func (s *StartedService) GetSystemProxyStatus(ctx context.Context, empty *emptypb.Empty) (*SystemProxyStatus, error) {
  610. return s.handler.SystemProxyStatus()
  611. }
  612. func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *SetSystemProxyEnabledRequest) (*emptypb.Empty, error) {
  613. err := s.handler.SetSystemProxyEnabled(request.Enabled)
  614. if err != nil {
  615. return nil, err
  616. }
  617. return nil, err
  618. }
  619. func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[Connections]) error {
  620. err := s.waitForStarted(server.Context())
  621. if err != nil {
  622. return err
  623. }
  624. s.serviceAccess.RLock()
  625. boxService := s.instance
  626. s.serviceAccess.RUnlock()
  627. ticker := time.NewTicker(time.Duration(request.Interval))
  628. defer ticker.Stop()
  629. trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
  630. var (
  631. connections = make(map[uuid.UUID]*Connection)
  632. outConnections []*Connection
  633. )
  634. for {
  635. outConnections = outConnections[:0]
  636. for _, connection := range trafficManager.Connections() {
  637. outConnections = append(outConnections, newConnection(connections, connection, false))
  638. }
  639. for _, connection := range trafficManager.ClosedConnections() {
  640. outConnections = append(outConnections, newConnection(connections, connection, true))
  641. }
  642. err := server.Send(&Connections{Connections: outConnections})
  643. if err != nil {
  644. return err
  645. }
  646. select {
  647. case <-s.ctx.Done():
  648. return s.ctx.Err()
  649. case <-server.Context().Done():
  650. return server.Context().Err()
  651. case <-ticker.C:
  652. }
  653. }
  654. }
  655. func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol.TrackerMetadata, isClosed bool) *Connection {
  656. if oldConnection, loaded := connections[metadata.ID]; loaded {
  657. if isClosed {
  658. if oldConnection.ClosedAt == 0 {
  659. oldConnection.Uplink = 0
  660. oldConnection.Downlink = 0
  661. oldConnection.ClosedAt = metadata.ClosedAt.UnixMilli()
  662. }
  663. return oldConnection
  664. }
  665. lastUplink := oldConnection.UplinkTotal
  666. lastDownlink := oldConnection.DownlinkTotal
  667. uplinkTotal := metadata.Upload.Load()
  668. downlinkTotal := metadata.Download.Load()
  669. oldConnection.Uplink = uplinkTotal - lastUplink
  670. oldConnection.Downlink = downlinkTotal - lastDownlink
  671. oldConnection.UplinkTotal = uplinkTotal
  672. oldConnection.DownlinkTotal = downlinkTotal
  673. return oldConnection
  674. }
  675. var rule string
  676. if metadata.Rule != nil {
  677. rule = metadata.Rule.String()
  678. }
  679. uplinkTotal := metadata.Upload.Load()
  680. downlinkTotal := metadata.Download.Load()
  681. uplink := uplinkTotal
  682. downlink := downlinkTotal
  683. var closedAt int64
  684. if !metadata.ClosedAt.IsZero() {
  685. closedAt = metadata.ClosedAt.UnixMilli()
  686. uplink = 0
  687. downlink = 0
  688. }
  689. connection := &Connection{
  690. Id: metadata.ID.String(),
  691. Inbound: metadata.Metadata.Inbound,
  692. InboundType: metadata.Metadata.InboundType,
  693. IpVersion: int32(metadata.Metadata.IPVersion),
  694. Network: metadata.Metadata.Network,
  695. Source: metadata.Metadata.Source.String(),
  696. Destination: metadata.Metadata.Destination.String(),
  697. Domain: metadata.Metadata.Domain,
  698. Protocol: metadata.Metadata.Protocol,
  699. User: metadata.Metadata.User,
  700. FromOutbound: metadata.Metadata.Outbound,
  701. CreatedAt: metadata.CreatedAt.UnixMilli(),
  702. ClosedAt: closedAt,
  703. Uplink: uplink,
  704. Downlink: downlink,
  705. UplinkTotal: uplinkTotal,
  706. DownlinkTotal: downlinkTotal,
  707. Rule: rule,
  708. Outbound: metadata.Outbound,
  709. OutboundType: metadata.OutboundType,
  710. ChainList: metadata.Chain,
  711. }
  712. connections[metadata.ID] = connection
  713. return connection
  714. }
  715. func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {
  716. s.serviceAccess.RLock()
  717. switch s.serviceStatus.Status {
  718. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  719. default:
  720. s.serviceAccess.RUnlock()
  721. return nil, os.ErrInvalid
  722. }
  723. boxService := s.instance
  724. s.serviceAccess.RUnlock()
  725. targetConn := boxService.clashServer.(*clashapi.Server).TrafficManager().Connection(uuid.FromStringOrNil(request.Id))
  726. if targetConn != nil {
  727. targetConn.Close()
  728. }
  729. return &emptypb.Empty{}, nil
  730. }
  731. func (s *StartedService) CloseAllConnections(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  732. conntrack.Close()
  733. return &emptypb.Empty{}, nil
  734. }
  735. func (s *StartedService) GetDeprecatedWarnings(ctx context.Context, empty *emptypb.Empty) (*DeprecatedWarnings, error) {
  736. s.serviceAccess.RLock()
  737. if s.serviceStatus.Status != ServiceStatus_STARTED {
  738. s.serviceAccess.RUnlock()
  739. return nil, os.ErrInvalid
  740. }
  741. boxService := s.instance
  742. s.serviceAccess.RUnlock()
  743. notes := service.FromContext[deprecated.Manager](boxService.ctx).(*deprecatedManager).Get()
  744. return &DeprecatedWarnings{
  745. Warnings: common.Map(notes, func(it deprecated.Note) *DeprecatedWarning {
  746. return &DeprecatedWarning{
  747. Message: it.Message(),
  748. Impending: it.Impending(),
  749. MigrationLink: it.MigrationLink,
  750. }
  751. }),
  752. }, nil
  753. }
  754. func (s *StartedService) SubscribeHelperEvents(empty *emptypb.Empty, server grpc.ServerStreamingServer[HelperRequest]) error {
  755. return os.ErrInvalid
  756. }
  757. func (s *StartedService) SendHelperResponse(ctx context.Context, response *HelperResponse) (*emptypb.Empty, error) {
  758. return nil, os.ErrInvalid
  759. }
  760. func (s *StartedService) mustEmbedUnimplementedStartedServiceServer() {
  761. }
  762. func (s *StartedService) WriteMessage(level log.Level, message string) {
  763. item := &log.Entry{Level: level, Message: message}
  764. s.logSubscriber.Emit(item)
  765. s.logAccess.Lock()
  766. s.logLines.PushBack(item)
  767. if s.logLines.Len() > s.logMaxLines {
  768. s.logLines.Remove(s.logLines.Front())
  769. }
  770. s.logAccess.Unlock()
  771. if s.debug {
  772. s.handler.WriteDebugMessage(message)
  773. }
  774. }
  775. func (s *StartedService) Instance() *Instance {
  776. s.serviceAccess.RLock()
  777. defer s.serviceAccess.RUnlock()
  778. return s.instance
  779. }