1
0

started_service.go 24 KB


  1. package daemon
  2. import (
  3. "context"
  4. "os"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "github.com/sagernet/sing-box/adapter"
  9. "github.com/sagernet/sing-box/common/conntrack"
  10. "github.com/sagernet/sing-box/common/urltest"
  11. "github.com/sagernet/sing-box/experimental/clashapi"
  12. "github.com/sagernet/sing-box/experimental/clashapi/trafficontrol"
  13. "github.com/sagernet/sing-box/experimental/deprecated"
  14. "github.com/sagernet/sing-box/log"
  15. "github.com/sagernet/sing-box/protocol/group"
  16. "github.com/sagernet/sing/common"
  17. "github.com/sagernet/sing/common/batch"
  18. E "github.com/sagernet/sing/common/exceptions"
  19. "github.com/sagernet/sing/common/memory"
  20. "github.com/sagernet/sing/common/observable"
  21. "github.com/sagernet/sing/common/x/list"
  22. "github.com/sagernet/sing/service"
  23. "github.com/gofrs/uuid/v5"
  24. "google.golang.org/grpc"
  25. "google.golang.org/protobuf/types/known/emptypb"
  26. )
  27. var _ StartedServiceServer = (*StartedService)(nil)
  28. type StartedService struct {
  29. ctx context.Context
  30. // platform adapter.PlatformInterface
  31. handler PlatformHandler
  32. debug bool
  33. logMaxLines int
  34. // workingDirectory string
  35. // tempDirectory string
  36. // userID int
  37. // groupID int
  38. // systemProxyEnabled bool
  39. serviceAccess sync.RWMutex
  40. serviceStatus *ServiceStatus
  41. serviceStatusSubscriber *observable.Subscriber[*ServiceStatus]
  42. serviceStatusObserver *observable.Observer[*ServiceStatus]
  43. logAccess sync.RWMutex
  44. logLines list.List[*log.Entry]
  45. logSubscriber *observable.Subscriber[*log.Entry]
  46. logObserver *observable.Observer[*log.Entry]
  47. instance *Instance
  48. startedAt time.Time
  49. urlTestSubscriber *observable.Subscriber[struct{}]
  50. urlTestObserver *observable.Observer[struct{}]
  51. urlTestHistoryStorage *urltest.HistoryStorage
  52. clashModeSubscriber *observable.Subscriber[struct{}]
  53. clashModeObserver *observable.Observer[struct{}]
  54. }
  55. type ServiceOptions struct {
  56. Context context.Context
  57. // Platform adapter.PlatformInterface
  58. Handler PlatformHandler
  59. Debug bool
  60. LogMaxLines int
  61. // WorkingDirectory string
  62. // TempDirectory string
  63. // UserID int
  64. // GroupID int
  65. // SystemProxyEnabled bool
  66. }
  67. func NewStartedService(options ServiceOptions) *StartedService {
  68. s := &StartedService{
  69. ctx: options.Context,
  70. // platform: options.Platform,
  71. handler: options.Handler,
  72. debug: options.Debug,
  73. logMaxLines: options.LogMaxLines,
  74. // workingDirectory: options.WorkingDirectory,
  75. // tempDirectory: options.TempDirectory,
  76. // userID: options.UserID,
  77. // groupID: options.GroupID,
  78. // systemProxyEnabled: options.SystemProxyEnabled,
  79. serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
  80. serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
  81. logSubscriber: observable.NewSubscriber[*log.Entry](128),
  82. urlTestSubscriber: observable.NewSubscriber[struct{}](1),
  83. urlTestHistoryStorage: urltest.NewHistoryStorage(),
  84. clashModeSubscriber: observable.NewSubscriber[struct{}](1),
  85. }
  86. s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
  87. s.logObserver = observable.NewObserver(s.logSubscriber, 64)
  88. s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
  89. s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
  90. return s
  91. }
  92. func (s *StartedService) resetLogs() {
  93. s.logAccess.Lock()
  94. s.logLines = list.List[*log.Entry]{}
  95. s.logAccess.Unlock()
  96. s.logSubscriber.Emit(nil)
  97. }
  98. func (s *StartedService) updateStatus(newStatus ServiceStatus_Type) {
  99. statusObject := &ServiceStatus{Status: newStatus}
  100. s.serviceStatusSubscriber.Emit(statusObject)
  101. s.serviceStatus = statusObject
  102. }
  103. func (s *StartedService) updateStatusError(err error) error {
  104. statusObject := &ServiceStatus{Status: ServiceStatus_FATAL, ErrorMessage: err.Error()}
  105. s.serviceStatusSubscriber.Emit(statusObject)
  106. s.serviceStatus = statusObject
  107. s.serviceAccess.Unlock()
  108. return err
  109. }
  110. func (s *StartedService) waitForStarted(ctx context.Context) error {
  111. s.serviceAccess.RLock()
  112. currentStatus := s.serviceStatus.Status
  113. s.serviceAccess.RUnlock()
  114. switch currentStatus {
  115. case ServiceStatus_STARTED:
  116. return nil
  117. case ServiceStatus_STARTING:
  118. default:
  119. return os.ErrInvalid
  120. }
  121. subscription, done, err := s.serviceStatusObserver.Subscribe()
  122. if err != nil {
  123. return err
  124. }
  125. defer s.serviceStatusObserver.UnSubscribe(subscription)
  126. for {
  127. select {
  128. case <-ctx.Done():
  129. return ctx.Err()
  130. case <-s.ctx.Done():
  131. return s.ctx.Err()
  132. case status := <-subscription:
  133. switch status.Status {
  134. case ServiceStatus_STARTED:
  135. return nil
  136. case ServiceStatus_FATAL:
  137. return E.New(status.ErrorMessage)
  138. case ServiceStatus_IDLE, ServiceStatus_STOPPING:
  139. return os.ErrInvalid
  140. }
  141. case <-done:
  142. return os.ErrClosed
  143. }
  144. }
  145. }
  146. func (s *StartedService) StartOrReloadService(profileContent string, options *OverrideOptions) error {
  147. s.serviceAccess.Lock()
  148. switch s.serviceStatus.Status {
  149. case ServiceStatus_IDLE, ServiceStatus_STARTED, ServiceStatus_STARTING:
  150. default:
  151. s.serviceAccess.Unlock()
  152. return os.ErrInvalid
  153. }
  154. oldInstance := s.instance
  155. if oldInstance != nil {
  156. s.updateStatus(ServiceStatus_STOPPING)
  157. s.serviceAccess.Unlock()
  158. _ = oldInstance.Close()
  159. s.serviceAccess.Lock()
  160. }
  161. s.updateStatus(ServiceStatus_STARTING)
  162. s.resetLogs()
  163. instance, err := s.newInstance(profileContent, options)
  164. if err != nil {
  165. return s.updateStatusError(err)
  166. }
  167. s.instance = instance
  168. instance.urlTestHistoryStorage.SetHook(s.urlTestSubscriber)
  169. if instance.clashServer != nil {
  170. instance.clashServer.SetModeUpdateHook(s.clashModeSubscriber)
  171. }
  172. s.serviceAccess.Unlock()
  173. err = instance.Start()
  174. s.serviceAccess.Lock()
  175. if s.serviceStatus.Status != ServiceStatus_STARTING {
  176. s.serviceAccess.Unlock()
  177. return nil
  178. }
  179. if err != nil {
  180. return s.updateStatusError(err)
  181. }
  182. s.startedAt = time.Now()
  183. s.updateStatus(ServiceStatus_STARTED)
  184. s.serviceAccess.Unlock()
  185. runtime.GC()
  186. return nil
  187. }
  188. func (s *StartedService) CloseService() error {
  189. s.serviceAccess.Lock()
  190. switch s.serviceStatus.Status {
  191. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  192. default:
  193. s.serviceAccess.Unlock()
  194. return os.ErrInvalid
  195. }
  196. s.updateStatus(ServiceStatus_STOPPING)
  197. if s.instance != nil {
  198. err := s.instance.Close()
  199. if err != nil {
  200. return s.updateStatusError(err)
  201. }
  202. }
  203. s.instance = nil
  204. s.startedAt = time.Time{}
  205. s.updateStatus(ServiceStatus_IDLE)
  206. s.serviceAccess.Unlock()
  207. runtime.GC()
  208. return nil
  209. }
  210. func (s *StartedService) SetError(err error) {
  211. s.serviceAccess.Lock()
  212. s.updateStatusError(err)
  213. s.WriteMessage(log.LevelError, err.Error())
  214. }
  215. func (s *StartedService) StopService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  216. err := s.handler.ServiceStop()
  217. if err != nil {
  218. return nil, err
  219. }
  220. return &emptypb.Empty{}, nil
  221. }
  222. func (s *StartedService) ReloadService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  223. err := s.handler.ServiceReload()
  224. if err != nil {
  225. return nil, err
  226. }
  227. return &emptypb.Empty{}, nil
  228. }
  229. func (s *StartedService) SubscribeServiceStatus(empty *emptypb.Empty, server grpc.ServerStreamingServer[ServiceStatus]) error {
  230. subscription, done, err := s.serviceStatusObserver.Subscribe()
  231. if err != nil {
  232. return err
  233. }
  234. defer s.serviceStatusObserver.UnSubscribe(subscription)
  235. err = server.Send(s.serviceStatus)
  236. if err != nil {
  237. return err
  238. }
  239. for {
  240. select {
  241. case <-s.ctx.Done():
  242. return s.ctx.Err()
  243. case <-server.Context().Done():
  244. return server.Context().Err()
  245. case newStatus := <-subscription:
  246. err = server.Send(newStatus)
  247. if err != nil {
  248. return err
  249. }
  250. case <-done:
  251. return nil
  252. }
  253. }
  254. }
  255. func (s *StartedService) SubscribeLog(empty *emptypb.Empty, server grpc.ServerStreamingServer[Log]) error {
  256. var savedLines []*log.Entry
  257. s.logAccess.Lock()
  258. savedLines = make([]*log.Entry, 0, s.logLines.Len())
  259. for element := s.logLines.Front(); element != nil; element = element.Next() {
  260. savedLines = append(savedLines, element.Value)
  261. }
  262. subscription, done, err := s.logObserver.Subscribe()
  263. s.logAccess.Unlock()
  264. if err != nil {
  265. return err
  266. }
  267. defer s.logObserver.UnSubscribe(subscription)
  268. err = server.Send(&Log{
  269. Messages: common.Map(savedLines, func(it *log.Entry) *Log_Message {
  270. return &Log_Message{
  271. Level: LogLevel(it.Level),
  272. Message: it.Message,
  273. }
  274. }),
  275. Reset_: true,
  276. })
  277. if err != nil {
  278. return err
  279. }
  280. for {
  281. select {
  282. case <-s.ctx.Done():
  283. return s.ctx.Err()
  284. case <-server.Context().Done():
  285. return server.Context().Err()
  286. case message := <-subscription:
  287. var rawMessage Log
  288. if message == nil {
  289. rawMessage.Reset_ = true
  290. } else {
  291. rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
  292. Level: LogLevel(message.Level),
  293. Message: message.Message,
  294. })
  295. }
  296. fetch:
  297. for {
  298. select {
  299. case message = <-subscription:
  300. if message == nil {
  301. rawMessage.Messages = nil
  302. rawMessage.Reset_ = true
  303. } else {
  304. rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
  305. Level: LogLevel(message.Level),
  306. Message: message.Message,
  307. })
  308. }
  309. default:
  310. break fetch
  311. }
  312. }
  313. err = server.Send(&rawMessage)
  314. if err != nil {
  315. return err
  316. }
  317. case <-done:
  318. return nil
  319. }
  320. }
  321. }
  322. func (s *StartedService) GetDefaultLogLevel(ctx context.Context, empty *emptypb.Empty) (*DefaultLogLevel, error) {
  323. s.serviceAccess.RLock()
  324. switch s.serviceStatus.Status {
  325. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  326. default:
  327. s.serviceAccess.RUnlock()
  328. return nil, os.ErrInvalid
  329. }
  330. logLevel := s.instance.instance.LogFactory().Level()
  331. s.serviceAccess.RUnlock()
  332. return &DefaultLogLevel{Level: LogLevel(logLevel)}, nil
  333. }
  334. func (s *StartedService) ClearLogs(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  335. s.resetLogs()
  336. return &emptypb.Empty{}, nil
  337. }
  338. func (s *StartedService) SubscribeStatus(request *SubscribeStatusRequest, server grpc.ServerStreamingServer[Status]) error {
  339. interval := time.Duration(request.Interval)
  340. if interval <= 0 {
  341. interval = time.Second // Default to 1 second
  342. }
  343. ticker := time.NewTicker(interval)
  344. defer ticker.Stop()
  345. status := s.readStatus()
  346. uploadTotal := status.UplinkTotal
  347. downloadTotal := status.DownlinkTotal
  348. for {
  349. err := server.Send(status)
  350. if err != nil {
  351. return err
  352. }
  353. select {
  354. case <-s.ctx.Done():
  355. return s.ctx.Err()
  356. case <-server.Context().Done():
  357. return server.Context().Err()
  358. case <-ticker.C:
  359. }
  360. status = s.readStatus()
  361. upload := status.UplinkTotal - uploadTotal
  362. download := status.DownlinkTotal - downloadTotal
  363. uploadTotal = status.UplinkTotal
  364. downloadTotal = status.DownlinkTotal
  365. status.Uplink = upload
  366. status.Downlink = download
  367. }
  368. }
  369. func (s *StartedService) readStatus() *Status {
  370. var status Status
  371. status.Memory = memory.Inuse()
  372. status.Goroutines = int32(runtime.NumGoroutine())
  373. status.ConnectionsOut = int32(conntrack.Count())
  374. s.serviceAccess.RLock()
  375. nowService := s.instance
  376. s.serviceAccess.RUnlock()
  377. if nowService != nil {
  378. if clashServer := nowService.clashServer; clashServer != nil {
  379. status.TrafficAvailable = true
  380. trafficManager := clashServer.(*clashapi.Server).TrafficManager()
  381. status.UplinkTotal, status.DownlinkTotal = trafficManager.Total()
  382. status.ConnectionsIn = int32(trafficManager.ConnectionsLen())
  383. }
  384. }
  385. return &status
  386. }
  387. func (s *StartedService) SubscribeGroups(empty *emptypb.Empty, server grpc.ServerStreamingServer[Groups]) error {
  388. err := s.waitForStarted(server.Context())
  389. if err != nil {
  390. return err
  391. }
  392. subscription, done, err := s.urlTestObserver.Subscribe()
  393. if err != nil {
  394. return err
  395. }
  396. defer s.urlTestObserver.UnSubscribe(subscription)
  397. for {
  398. s.serviceAccess.RLock()
  399. if s.serviceStatus.Status != ServiceStatus_STARTED {
  400. s.serviceAccess.RUnlock()
  401. return os.ErrInvalid
  402. }
  403. groups := s.readGroups()
  404. s.serviceAccess.RUnlock()
  405. err = server.Send(groups)
  406. if err != nil {
  407. return err
  408. }
  409. select {
  410. case <-subscription:
  411. case <-s.ctx.Done():
  412. return s.ctx.Err()
  413. case <-server.Context().Done():
  414. return server.Context().Err()
  415. case <-done:
  416. return nil
  417. }
  418. }
  419. }
  420. func (s *StartedService) readGroups() *Groups {
  421. historyStorage := s.instance.urlTestHistoryStorage
  422. boxService := s.instance
  423. outbounds := boxService.instance.Outbound().Outbounds()
  424. var iGroups []adapter.OutboundGroup
  425. for _, it := range outbounds {
  426. if group, isGroup := it.(adapter.OutboundGroup); isGroup {
  427. iGroups = append(iGroups, group)
  428. }
  429. }
  430. var gs Groups
  431. for _, iGroup := range iGroups {
  432. var g Group
  433. g.Tag = iGroup.Tag()
  434. g.Type = iGroup.Type()
  435. _, g.Selectable = iGroup.(*group.Selector)
  436. g.Selected = iGroup.Now()
  437. if boxService.cacheFile != nil {
  438. if isExpand, loaded := boxService.cacheFile.LoadGroupExpand(g.Tag); loaded {
  439. g.IsExpand = isExpand
  440. }
  441. }
  442. for _, itemTag := range iGroup.All() {
  443. itemOutbound, isLoaded := boxService.instance.Outbound().Outbound(itemTag)
  444. if !isLoaded {
  445. continue
  446. }
  447. var item GroupItem
  448. item.Tag = itemTag
  449. item.Type = itemOutbound.Type()
  450. if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(itemOutbound)); history != nil {
  451. item.UrlTestTime = history.Time.Unix()
  452. item.UrlTestDelay = int32(history.Delay)
  453. }
  454. g.Items = append(g.Items, &item)
  455. }
  456. if len(g.Items) < 2 {
  457. continue
  458. }
  459. gs.Group = append(gs.Group, &g)
  460. }
  461. return &gs
  462. }
  463. func (s *StartedService) GetClashModeStatus(ctx context.Context, empty *emptypb.Empty) (*ClashModeStatus, error) {
  464. s.serviceAccess.RLock()
  465. if s.serviceStatus.Status != ServiceStatus_STARTED {
  466. s.serviceAccess.RUnlock()
  467. return nil, os.ErrInvalid
  468. }
  469. clashServer := s.instance.clashServer
  470. s.serviceAccess.RUnlock()
  471. if clashServer == nil {
  472. return nil, os.ErrInvalid
  473. }
  474. return &ClashModeStatus{
  475. ModeList: clashServer.ModeList(),
  476. CurrentMode: clashServer.Mode(),
  477. }, nil
  478. }
  479. func (s *StartedService) SubscribeClashMode(empty *emptypb.Empty, server grpc.ServerStreamingServer[ClashMode]) error {
  480. err := s.waitForStarted(server.Context())
  481. if err != nil {
  482. return err
  483. }
  484. subscription, done, err := s.clashModeObserver.Subscribe()
  485. if err != nil {
  486. return err
  487. }
  488. defer s.clashModeObserver.UnSubscribe(subscription)
  489. for {
  490. s.serviceAccess.RLock()
  491. if s.serviceStatus.Status != ServiceStatus_STARTED {
  492. s.serviceAccess.RUnlock()
  493. return os.ErrInvalid
  494. }
  495. message := &ClashMode{Mode: s.instance.clashServer.Mode()}
  496. s.serviceAccess.RUnlock()
  497. err = server.Send(message)
  498. if err != nil {
  499. return err
  500. }
  501. select {
  502. case <-subscription:
  503. case <-s.ctx.Done():
  504. return s.ctx.Err()
  505. case <-server.Context().Done():
  506. return server.Context().Err()
  507. case <-done:
  508. return nil
  509. }
  510. }
  511. }
  512. func (s *StartedService) SetClashMode(ctx context.Context, request *ClashMode) (*emptypb.Empty, error) {
  513. s.serviceAccess.RLock()
  514. if s.serviceStatus.Status != ServiceStatus_STARTED {
  515. s.serviceAccess.RUnlock()
  516. return nil, os.ErrInvalid
  517. }
  518. clashServer := s.instance.clashServer
  519. s.serviceAccess.RUnlock()
  520. clashServer.(*clashapi.Server).SetMode(request.Mode)
  521. return &emptypb.Empty{}, nil
  522. }
  523. func (s *StartedService) URLTest(ctx context.Context, request *URLTestRequest) (*emptypb.Empty, error) {
  524. s.serviceAccess.RLock()
  525. if s.serviceStatus.Status != ServiceStatus_STARTED {
  526. s.serviceAccess.RUnlock()
  527. return nil, os.ErrInvalid
  528. }
  529. boxService := s.instance
  530. s.serviceAccess.RUnlock()
  531. groupTag := request.OutboundTag
  532. abstractOutboundGroup, isLoaded := boxService.instance.Outbound().Outbound(groupTag)
  533. if !isLoaded {
  534. return nil, E.New("outbound group not found: ", groupTag)
  535. }
  536. outboundGroup, isOutboundGroup := abstractOutboundGroup.(adapter.OutboundGroup)
  537. if !isOutboundGroup {
  538. return nil, E.New("outbound is not a group: ", groupTag)
  539. }
  540. urlTest, isURLTest := abstractOutboundGroup.(*group.URLTest)
  541. if isURLTest {
  542. go urlTest.CheckOutbounds()
  543. } else {
  544. historyStorage := boxService.urlTestHistoryStorage
  545. outbounds := common.Filter(common.Map(outboundGroup.All(), func(it string) adapter.Outbound {
  546. itOutbound, _ := boxService.instance.Outbound().Outbound(it)
  547. return itOutbound
  548. }), func(it adapter.Outbound) bool {
  549. if it == nil {
  550. return false
  551. }
  552. _, isGroup := it.(adapter.OutboundGroup)
  553. if isGroup {
  554. return false
  555. }
  556. return true
  557. })
  558. b, _ := batch.New(boxService.ctx, batch.WithConcurrencyNum[any](10))
  559. for _, detour := range outbounds {
  560. outboundToTest := detour
  561. outboundTag := outboundToTest.Tag()
  562. b.Go(outboundTag, func() (any, error) {
  563. t, err := urltest.URLTest(boxService.ctx, "", outboundToTest)
  564. if err != nil {
  565. historyStorage.DeleteURLTestHistory(outboundTag)
  566. } else {
  567. historyStorage.StoreURLTestHistory(outboundTag, &adapter.URLTestHistory{
  568. Time: time.Now(),
  569. Delay: t,
  570. })
  571. }
  572. return nil, nil
  573. })
  574. }
  575. }
  576. return &emptypb.Empty{}, nil
  577. }
  578. func (s *StartedService) SelectOutbound(ctx context.Context, request *SelectOutboundRequest) (*emptypb.Empty, error) {
  579. s.serviceAccess.RLock()
  580. switch s.serviceStatus.Status {
  581. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  582. default:
  583. s.serviceAccess.RUnlock()
  584. return nil, os.ErrInvalid
  585. }
  586. boxService := s.instance.instance
  587. s.serviceAccess.RUnlock()
  588. outboundGroup, isLoaded := boxService.Outbound().Outbound(request.GroupTag)
  589. if !isLoaded {
  590. return nil, E.New("selector not found: ", request.GroupTag)
  591. }
  592. selector, isSelector := outboundGroup.(*group.Selector)
  593. if !isSelector {
  594. return nil, E.New("outbound is not a selector: ", request.GroupTag)
  595. }
  596. if !selector.SelectOutbound(request.OutboundTag) {
  597. return nil, E.New("outbound not found in selector: ", request.OutboundTag)
  598. }
  599. s.urlTestObserver.Emit(struct{}{})
  600. return &emptypb.Empty{}, nil
  601. }
  602. func (s *StartedService) SetGroupExpand(ctx context.Context, request *SetGroupExpandRequest) (*emptypb.Empty, error) {
  603. s.serviceAccess.RLock()
  604. switch s.serviceStatus.Status {
  605. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  606. default:
  607. s.serviceAccess.RUnlock()
  608. return nil, os.ErrInvalid
  609. }
  610. boxService := s.instance
  611. s.serviceAccess.RUnlock()
  612. if boxService.cacheFile != nil {
  613. err := boxService.cacheFile.StoreGroupExpand(request.GroupTag, request.IsExpand)
  614. if err != nil {
  615. return nil, err
  616. }
  617. }
  618. return &emptypb.Empty{}, nil
  619. }
  620. func (s *StartedService) GetSystemProxyStatus(ctx context.Context, empty *emptypb.Empty) (*SystemProxyStatus, error) {
  621. return s.handler.SystemProxyStatus()
  622. }
  623. func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *SetSystemProxyEnabledRequest) (*emptypb.Empty, error) {
  624. err := s.handler.SetSystemProxyEnabled(request.Enabled)
  625. if err != nil {
  626. return nil, err
  627. }
  628. return nil, err
  629. }
  630. func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[Connections]) error {
  631. err := s.waitForStarted(server.Context())
  632. if err != nil {
  633. return err
  634. }
  635. s.serviceAccess.RLock()
  636. boxService := s.instance
  637. s.serviceAccess.RUnlock()
  638. ticker := time.NewTicker(time.Duration(request.Interval))
  639. defer ticker.Stop()
  640. trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
  641. var (
  642. connections = make(map[uuid.UUID]*Connection)
  643. outConnections []*Connection
  644. )
  645. for {
  646. outConnections = outConnections[:0]
  647. for _, connection := range trafficManager.Connections() {
  648. outConnections = append(outConnections, newConnection(connections, connection, false))
  649. }
  650. for _, connection := range trafficManager.ClosedConnections() {
  651. outConnections = append(outConnections, newConnection(connections, connection, true))
  652. }
  653. err := server.Send(&Connections{Connections: outConnections})
  654. if err != nil {
  655. return err
  656. }
  657. select {
  658. case <-s.ctx.Done():
  659. return s.ctx.Err()
  660. case <-server.Context().Done():
  661. return server.Context().Err()
  662. case <-ticker.C:
  663. }
  664. }
  665. }
  666. func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol.TrackerMetadata, isClosed bool) *Connection {
  667. if oldConnection, loaded := connections[metadata.ID]; loaded {
  668. if isClosed {
  669. if oldConnection.ClosedAt == 0 {
  670. oldConnection.Uplink = 0
  671. oldConnection.Downlink = 0
  672. oldConnection.ClosedAt = metadata.ClosedAt.UnixMilli()
  673. }
  674. return oldConnection
  675. }
  676. lastUplink := oldConnection.UplinkTotal
  677. lastDownlink := oldConnection.DownlinkTotal
  678. uplinkTotal := metadata.Upload.Load()
  679. downlinkTotal := metadata.Download.Load()
  680. oldConnection.Uplink = uplinkTotal - lastUplink
  681. oldConnection.Downlink = downlinkTotal - lastDownlink
  682. oldConnection.UplinkTotal = uplinkTotal
  683. oldConnection.DownlinkTotal = downlinkTotal
  684. return oldConnection
  685. }
  686. var rule string
  687. if metadata.Rule != nil {
  688. rule = metadata.Rule.String()
  689. }
  690. uplinkTotal := metadata.Upload.Load()
  691. downlinkTotal := metadata.Download.Load()
  692. uplink := uplinkTotal
  693. downlink := downlinkTotal
  694. var closedAt int64
  695. if !metadata.ClosedAt.IsZero() {
  696. closedAt = metadata.ClosedAt.UnixMilli()
  697. uplink = 0
  698. downlink = 0
  699. }
  700. connection := &Connection{
  701. Id: metadata.ID.String(),
  702. Inbound: metadata.Metadata.Inbound,
  703. InboundType: metadata.Metadata.InboundType,
  704. IpVersion: int32(metadata.Metadata.IPVersion),
  705. Network: metadata.Metadata.Network,
  706. Source: metadata.Metadata.Source.String(),
  707. Destination: metadata.Metadata.Destination.String(),
  708. Domain: metadata.Metadata.Domain,
  709. Protocol: metadata.Metadata.Protocol,
  710. User: metadata.Metadata.User,
  711. FromOutbound: metadata.Metadata.Outbound,
  712. CreatedAt: metadata.CreatedAt.UnixMilli(),
  713. ClosedAt: closedAt,
  714. Uplink: uplink,
  715. Downlink: downlink,
  716. UplinkTotal: uplinkTotal,
  717. DownlinkTotal: downlinkTotal,
  718. Rule: rule,
  719. Outbound: metadata.Outbound,
  720. OutboundType: metadata.OutboundType,
  721. ChainList: metadata.Chain,
  722. }
  723. connections[metadata.ID] = connection
  724. return connection
  725. }
  726. func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {
  727. s.serviceAccess.RLock()
  728. switch s.serviceStatus.Status {
  729. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  730. default:
  731. s.serviceAccess.RUnlock()
  732. return nil, os.ErrInvalid
  733. }
  734. boxService := s.instance
  735. s.serviceAccess.RUnlock()
  736. targetConn := boxService.clashServer.(*clashapi.Server).TrafficManager().Connection(uuid.FromStringOrNil(request.Id))
  737. if targetConn != nil {
  738. targetConn.Close()
  739. }
  740. return &emptypb.Empty{}, nil
  741. }
  742. func (s *StartedService) CloseAllConnections(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  743. conntrack.Close()
  744. return &emptypb.Empty{}, nil
  745. }
  746. func (s *StartedService) GetDeprecatedWarnings(ctx context.Context, empty *emptypb.Empty) (*DeprecatedWarnings, error) {
  747. s.serviceAccess.RLock()
  748. if s.serviceStatus.Status != ServiceStatus_STARTED {
  749. s.serviceAccess.RUnlock()
  750. return nil, os.ErrInvalid
  751. }
  752. boxService := s.instance
  753. s.serviceAccess.RUnlock()
  754. notes := service.FromContext[deprecated.Manager](boxService.ctx).(*deprecatedManager).Get()
  755. return &DeprecatedWarnings{
  756. Warnings: common.Map(notes, func(it deprecated.Note) *DeprecatedWarning {
  757. return &DeprecatedWarning{
  758. Message: it.Message(),
  759. Impending: it.Impending(),
  760. MigrationLink: it.MigrationLink,
  761. }
  762. }),
  763. }, nil
  764. }
  765. func (s *StartedService) GetStartedAt(ctx context.Context, empty *emptypb.Empty) (*StartedAt, error) {
  766. s.serviceAccess.RLock()
  767. defer s.serviceAccess.RUnlock()
  768. return &StartedAt{StartedAt: s.startedAt.UnixMilli()}, nil
  769. }
  770. func (s *StartedService) SubscribeHelperEvents(empty *emptypb.Empty, server grpc.ServerStreamingServer[HelperRequest]) error {
  771. return os.ErrInvalid
  772. }
  773. func (s *StartedService) SendHelperResponse(ctx context.Context, response *HelperResponse) (*emptypb.Empty, error) {
  774. return nil, os.ErrInvalid
  775. }
  776. func (s *StartedService) mustEmbedUnimplementedStartedServiceServer() {
  777. }
  778. func (s *StartedService) WriteMessage(level log.Level, message string) {
  779. item := &log.Entry{Level: level, Message: message}
  780. s.logAccess.Lock()
  781. s.logLines.PushBack(item)
  782. if s.logLines.Len() > s.logMaxLines {
  783. s.logLines.Remove(s.logLines.Front())
  784. }
  785. s.logAccess.Unlock()
  786. s.logSubscriber.Emit(item)
  787. if s.debug {
  788. s.handler.WriteDebugMessage(message)
  789. }
  790. }
  791. func (s *StartedService) Instance() *Instance {
  792. s.serviceAccess.RLock()
  793. defer s.serviceAccess.RUnlock()
  794. return s.instance
  795. }