started_service.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. package daemon
  2. import (
  3. "context"
  4. "os"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "github.com/sagernet/sing-box/adapter"
  9. "github.com/sagernet/sing-box/common/conntrack"
  10. "github.com/sagernet/sing-box/common/urltest"
  11. "github.com/sagernet/sing-box/experimental/clashapi"
  12. "github.com/sagernet/sing-box/experimental/clashapi/trafficontrol"
  13. "github.com/sagernet/sing-box/experimental/deprecated"
  14. "github.com/sagernet/sing-box/log"
  15. "github.com/sagernet/sing-box/protocol/group"
  16. "github.com/sagernet/sing/common"
  17. "github.com/sagernet/sing/common/batch"
  18. E "github.com/sagernet/sing/common/exceptions"
  19. "github.com/sagernet/sing/common/memory"
  20. "github.com/sagernet/sing/common/observable"
  21. "github.com/sagernet/sing/common/x/list"
  22. "github.com/sagernet/sing/service"
  23. "github.com/gofrs/uuid/v5"
  24. "google.golang.org/grpc"
  25. "google.golang.org/protobuf/types/known/emptypb"
  26. )
  27. var _ StartedServiceServer = (*StartedService)(nil)
  28. type StartedService struct {
  29. ctx context.Context
  30. platform PlatformInterface
  31. platformHandler PlatformHandler
  32. debug bool
  33. logMaxLines int
  34. workingDirectory string
  35. tempDirectory string
  36. userID int
  37. groupID int
  38. systemProxyEnabled bool
  39. serviceAccess sync.RWMutex
  40. serviceStatus *ServiceStatus
  41. serviceStatusSubscriber *observable.Subscriber[*ServiceStatus]
  42. serviceStatusObserver *observable.Observer[*ServiceStatus]
  43. logAccess sync.RWMutex
  44. logLines list.List[*log.Entry]
  45. logSubscriber *observable.Subscriber[*log.Entry]
  46. logObserver *observable.Observer[*log.Entry]
  47. instance *Instance
  48. urlTestSubscriber *observable.Subscriber[struct{}]
  49. urlTestObserver *observable.Observer[struct{}]
  50. urlTestHistoryStorage *urltest.HistoryStorage
  51. clashModeSubscriber *observable.Subscriber[struct{}]
  52. clashModeObserver *observable.Observer[struct{}]
  53. }
  54. type ServiceOptions struct {
  55. Context context.Context
  56. Platform PlatformInterface
  57. PlatformHandler PlatformHandler
  58. Debug bool
  59. LogMaxLines int
  60. WorkingDirectory string
  61. TempDirectory string
  62. UserID int
  63. GroupID int
  64. SystemProxyEnabled bool
  65. }
  66. func NewStartedService(options ServiceOptions) *StartedService {
  67. s := &StartedService{
  68. ctx: options.Context,
  69. platform: options.Platform,
  70. platformHandler: options.PlatformHandler,
  71. debug: options.Debug,
  72. logMaxLines: options.LogMaxLines,
  73. workingDirectory: options.WorkingDirectory,
  74. tempDirectory: options.TempDirectory,
  75. userID: options.UserID,
  76. groupID: options.GroupID,
  77. systemProxyEnabled: options.SystemProxyEnabled,
  78. serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
  79. serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
  80. logSubscriber: observable.NewSubscriber[*log.Entry](128),
  81. urlTestSubscriber: observable.NewSubscriber[struct{}](1),
  82. urlTestHistoryStorage: urltest.NewHistoryStorage(),
  83. clashModeSubscriber: observable.NewSubscriber[struct{}](1),
  84. }
  85. s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
  86. s.logObserver = observable.NewObserver(s.logSubscriber, 64)
  87. s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
  88. s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
  89. return s
  90. }
  91. func (s *StartedService) resetLogs() {
  92. s.logAccess.Lock()
  93. s.logLines = list.List[*log.Entry]{}
  94. s.logAccess.Unlock()
  95. s.logSubscriber.Emit(nil)
  96. }
  97. func (s *StartedService) updateStatus(newStatus ServiceStatus_Type) {
  98. statusObject := &ServiceStatus{Status: newStatus}
  99. s.serviceStatusSubscriber.Emit(statusObject)
  100. s.serviceStatus = statusObject
  101. }
  102. func (s *StartedService) updateStatusError(err error) error {
  103. statusObject := &ServiceStatus{Status: ServiceStatus_FATAL, ErrorMessage: err.Error()}
  104. s.serviceStatusSubscriber.Emit(statusObject)
  105. s.serviceStatus = statusObject
  106. s.serviceAccess.Unlock()
  107. return err
  108. }
  109. func (s *StartedService) StartOrReloadService(profileContent string, options *OverrideOptions) error {
  110. s.serviceAccess.Lock()
  111. switch s.serviceStatus.Status {
  112. case ServiceStatus_IDLE, ServiceStatus_STARTED, ServiceStatus_STARTING:
  113. default:
  114. s.serviceAccess.Unlock()
  115. return os.ErrInvalid
  116. }
  117. s.updateStatus(ServiceStatus_STARTING)
  118. s.resetLogs()
  119. instance, err := s.newInstance(profileContent, options)
  120. if err != nil {
  121. return s.updateStatusError(err)
  122. }
  123. s.instance = instance
  124. s.serviceAccess.Unlock()
  125. err = instance.Start()
  126. s.serviceAccess.Lock()
  127. if s.serviceStatus.Status != ServiceStatus_STARTING {
  128. s.serviceAccess.Unlock()
  129. return nil
  130. }
  131. if err != nil {
  132. return s.updateStatusError(err)
  133. }
  134. s.updateStatus(ServiceStatus_STARTED)
  135. s.serviceAccess.Unlock()
  136. runtime.GC()
  137. return nil
  138. }
  139. func (s *StartedService) CloseService() error {
  140. s.serviceAccess.Lock()
  141. switch s.serviceStatus.Status {
  142. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  143. default:
  144. s.serviceAccess.Unlock()
  145. return os.ErrInvalid
  146. }
  147. s.updateStatus(ServiceStatus_STOPPING)
  148. if s.instance != nil {
  149. err := s.instance.Close()
  150. if err != nil {
  151. return s.updateStatusError(err)
  152. }
  153. }
  154. s.instance = nil
  155. s.updateStatus(ServiceStatus_IDLE)
  156. s.serviceAccess.Unlock()
  157. runtime.GC()
  158. return nil
  159. }
  160. func (s *StartedService) SetError(err error) {
  161. s.serviceAccess.Lock()
  162. s.updateStatusError(err)
  163. s.serviceAccess.Unlock()
  164. s.WriteMessage(log.LevelError, err.Error())
  165. }
  166. func (s *StartedService) StopService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  167. err := s.platformHandler.ServiceStop()
  168. if err != nil {
  169. return nil, err
  170. }
  171. return &emptypb.Empty{}, nil
  172. }
  173. func (s *StartedService) ReloadService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  174. err := s.platformHandler.ServiceReload()
  175. if err != nil {
  176. return nil, err
  177. }
  178. return &emptypb.Empty{}, nil
  179. }
  180. func (s *StartedService) SubscribeServiceStatus(empty *emptypb.Empty, server grpc.ServerStreamingServer[ServiceStatus]) error {
  181. subscription, done, err := s.serviceStatusObserver.Subscribe()
  182. if err != nil {
  183. return err
  184. }
  185. defer s.serviceStatusObserver.UnSubscribe(subscription)
  186. err = server.Send(s.serviceStatus)
  187. if err != nil {
  188. return err
  189. }
  190. for {
  191. select {
  192. case <-s.ctx.Done():
  193. return s.ctx.Err()
  194. case <-server.Context().Done():
  195. return server.Context().Err()
  196. case newStatus := <-subscription:
  197. err = server.Send(newStatus)
  198. if err != nil {
  199. return err
  200. }
  201. case <-done:
  202. return nil
  203. }
  204. }
  205. }
  206. func (s *StartedService) SubscribeLog(empty *emptypb.Empty, server grpc.ServerStreamingServer[Log]) error {
  207. var savedLines []*log.Entry
  208. s.logAccess.Lock()
  209. savedLines = make([]*log.Entry, 0, s.logLines.Len())
  210. for element := s.logLines.Front(); element != nil; element = element.Next() {
  211. savedLines = append(savedLines, element.Value)
  212. }
  213. s.logAccess.Unlock()
  214. subscription, done, err := s.logObserver.Subscribe()
  215. if err != nil {
  216. return err
  217. }
  218. defer s.logObserver.UnSubscribe(subscription)
  219. err = server.Send(&Log{
  220. Messages: common.Map(savedLines, func(it *log.Entry) *Log_Message {
  221. return &Log_Message{
  222. Level: LogLevel(it.Level),
  223. Message: it.Message,
  224. }
  225. }),
  226. Reset_: true,
  227. })
  228. if err != nil {
  229. return err
  230. }
  231. for {
  232. select {
  233. case <-s.ctx.Done():
  234. return s.ctx.Err()
  235. case <-server.Context().Done():
  236. return server.Context().Err()
  237. case message := <-subscription:
  238. if message == nil {
  239. err = server.Send(&Log{Reset_: true})
  240. if err != nil {
  241. return err
  242. }
  243. continue
  244. }
  245. messages := []*Log_Message{{
  246. Level: LogLevel(message.Level),
  247. Message: message.Message,
  248. }}
  249. fetch:
  250. for {
  251. select {
  252. case message = <-subscription:
  253. messages = append(messages, &Log_Message{
  254. Level: LogLevel(message.Level),
  255. Message: message.Message,
  256. })
  257. default:
  258. break fetch
  259. }
  260. }
  261. err = server.Send(&Log{Messages: messages})
  262. if err != nil {
  263. return err
  264. }
  265. case <-done:
  266. return nil
  267. }
  268. }
  269. }
  270. func (s *StartedService) GetDefaultLogLevel(ctx context.Context, empty *emptypb.Empty) (*DefaultLogLevel, error) {
  271. s.serviceAccess.RLock()
  272. switch s.serviceStatus.Status {
  273. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  274. default:
  275. s.serviceAccess.RUnlock()
  276. return nil, os.ErrInvalid
  277. }
  278. logLevel := s.instance.instance.LogFactory().Level()
  279. s.serviceAccess.RUnlock()
  280. return &DefaultLogLevel{Level: LogLevel(logLevel)}, nil
  281. }
  282. func (s *StartedService) SubscribeStatus(request *SubscribeStatusRequest, server grpc.ServerStreamingServer[Status]) error {
  283. interval := time.Duration(request.Interval)
  284. if interval <= 0 {
  285. interval = time.Second // Default to 1 second
  286. }
  287. ticker := time.NewTicker(interval)
  288. defer ticker.Stop()
  289. status := s.readStatus()
  290. uploadTotal := status.UplinkTotal
  291. downloadTotal := status.DownlinkTotal
  292. for {
  293. err := server.Send(status)
  294. if err != nil {
  295. return err
  296. }
  297. select {
  298. case <-s.ctx.Done():
  299. return s.ctx.Err()
  300. case <-server.Context().Done():
  301. return server.Context().Err()
  302. case <-ticker.C:
  303. }
  304. status = s.readStatus()
  305. upload := status.UplinkTotal - uploadTotal
  306. download := status.DownlinkTotal - downloadTotal
  307. uploadTotal = status.UplinkTotal
  308. downloadTotal = status.DownlinkTotal
  309. status.Uplink = upload
  310. status.Downlink = download
  311. }
  312. }
  313. func (s *StartedService) readStatus() *Status {
  314. var status Status
  315. status.Memory = memory.Inuse()
  316. status.Goroutines = int32(runtime.NumGoroutine())
  317. status.ConnectionsOut = int32(conntrack.Count())
  318. nowService := s.instance
  319. if nowService != nil {
  320. if clashServer := nowService.clashServer; clashServer != nil {
  321. status.TrafficAvailable = true
  322. trafficManager := clashServer.(*clashapi.Server).TrafficManager()
  323. status.UplinkTotal, status.DownlinkTotal = trafficManager.Total()
  324. status.ConnectionsIn = int32(trafficManager.ConnectionsLen())
  325. }
  326. }
  327. return &status
  328. }
  329. func (s *StartedService) SubscribeGroups(empty *emptypb.Empty, server grpc.ServerStreamingServer[Groups]) error {
  330. subscription, done, err := s.urlTestObserver.Subscribe()
  331. if err != nil {
  332. return err
  333. }
  334. defer s.urlTestObserver.UnSubscribe(subscription)
  335. for {
  336. s.serviceAccess.RLock()
  337. switch s.serviceStatus.Status {
  338. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  339. groups := s.readGroups()
  340. s.serviceAccess.RUnlock()
  341. err = server.Send(groups)
  342. if err != nil {
  343. return err
  344. }
  345. default:
  346. s.serviceAccess.RUnlock()
  347. return os.ErrInvalid
  348. }
  349. select {
  350. case <-subscription:
  351. case <-s.ctx.Done():
  352. return s.ctx.Err()
  353. case <-server.Context().Done():
  354. return server.Context().Err()
  355. case <-done:
  356. return nil
  357. }
  358. }
  359. }
  360. func (s *StartedService) readGroups() *Groups {
  361. historyStorage := s.instance.urlTestHistoryStorage
  362. boxService := s.instance
  363. outbounds := boxService.instance.Outbound().Outbounds()
  364. var iGroups []adapter.OutboundGroup
  365. for _, it := range outbounds {
  366. if group, isGroup := it.(adapter.OutboundGroup); isGroup {
  367. iGroups = append(iGroups, group)
  368. }
  369. }
  370. var gs Groups
  371. for _, iGroup := range iGroups {
  372. var g Group
  373. g.Tag = iGroup.Tag()
  374. g.Type = iGroup.Type()
  375. _, g.Selectable = iGroup.(*group.Selector)
  376. g.Selected = iGroup.Now()
  377. if boxService.cacheFile != nil {
  378. if isExpand, loaded := boxService.cacheFile.LoadGroupExpand(g.Tag); loaded {
  379. g.IsExpand = isExpand
  380. }
  381. }
  382. for _, itemTag := range iGroup.All() {
  383. itemOutbound, isLoaded := boxService.instance.Outbound().Outbound(itemTag)
  384. if !isLoaded {
  385. continue
  386. }
  387. var item GroupItem
  388. item.Tag = itemTag
  389. item.Type = itemOutbound.Type()
  390. if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(itemOutbound)); history != nil {
  391. item.UrlTestTime = history.Time.Unix()
  392. item.UrlTestDelay = int32(history.Delay)
  393. }
  394. g.Items = append(g.Items, &item)
  395. }
  396. if len(g.Items) < 2 {
  397. continue
  398. }
  399. gs.Group = append(gs.Group, &g)
  400. }
  401. return &gs
  402. }
  403. func (s *StartedService) GetClashModeStatus(ctx context.Context, empty *emptypb.Empty) (*ClashModeStatus, error) {
  404. s.serviceAccess.RLock()
  405. if s.serviceStatus.Status != ServiceStatus_STARTED {
  406. s.serviceAccess.RUnlock()
  407. return nil, os.ErrInvalid
  408. }
  409. clashServer := s.instance.clashServer
  410. s.serviceAccess.RUnlock()
  411. if clashServer == nil {
  412. return nil, os.ErrInvalid
  413. }
  414. return &ClashModeStatus{
  415. ModeList: clashServer.ModeList(),
  416. CurrentMode: clashServer.Mode(),
  417. }, nil
  418. }
  419. func (s *StartedService) SubscribeClashMode(empty *emptypb.Empty, server grpc.ServerStreamingServer[ClashMode]) error {
  420. subscription, done, err := s.clashModeObserver.Subscribe()
  421. if err != nil {
  422. return err
  423. }
  424. defer s.clashModeObserver.UnSubscribe(subscription)
  425. for {
  426. select {
  427. case <-subscription:
  428. case <-s.ctx.Done():
  429. return s.ctx.Err()
  430. case <-server.Context().Done():
  431. return server.Context().Err()
  432. case <-done:
  433. return nil
  434. }
  435. s.serviceAccess.RLock()
  436. if s.serviceStatus.Status != ServiceStatus_STARTED {
  437. return nil
  438. }
  439. message := &ClashMode{Mode: s.instance.clashServer.Mode()}
  440. s.serviceAccess.RUnlock()
  441. err = server.Send(message)
  442. if err != nil {
  443. return err
  444. }
  445. }
  446. }
  447. func (s *StartedService) SetClashMode(ctx context.Context, request *ClashMode) (*emptypb.Empty, error) {
  448. s.serviceAccess.RLock()
  449. if s.serviceStatus.Status != ServiceStatus_STARTED {
  450. s.serviceAccess.RUnlock()
  451. return nil, os.ErrInvalid
  452. }
  453. clashServer := s.instance.clashServer
  454. s.serviceAccess.RUnlock()
  455. clashServer.(*clashapi.Server).SetMode(request.Mode)
  456. return &emptypb.Empty{}, nil
  457. }
  458. func (s *StartedService) URLTest(ctx context.Context, request *URLTestRequest) (*emptypb.Empty, error) {
  459. s.serviceAccess.RLock()
  460. if s.serviceStatus.Status != ServiceStatus_STARTED {
  461. s.serviceAccess.RUnlock()
  462. return nil, os.ErrInvalid
  463. }
  464. boxService := s.instance
  465. s.serviceAccess.RUnlock()
  466. groupTag := request.OutboundTag
  467. abstractOutboundGroup, isLoaded := boxService.instance.Outbound().Outbound(groupTag)
  468. if !isLoaded {
  469. return nil, E.New("outbound group not found: ", groupTag)
  470. }
  471. outboundGroup, isOutboundGroup := abstractOutboundGroup.(adapter.OutboundGroup)
  472. if !isOutboundGroup {
  473. return nil, E.New("outbound is not a group: ", groupTag)
  474. }
  475. urlTest, isURLTest := abstractOutboundGroup.(*group.URLTest)
  476. if isURLTest {
  477. go urlTest.CheckOutbounds()
  478. } else {
  479. var historyStorage adapter.URLTestHistoryStorage
  480. if s.instance.clashServer != nil {
  481. historyStorage = s.instance.clashServer.HistoryStorage()
  482. } else {
  483. return nil, E.New("Clash API is required for URLTest on non-URLTest group")
  484. }
  485. outbounds := common.Filter(common.Map(outboundGroup.All(), func(it string) adapter.Outbound {
  486. itOutbound, _ := boxService.instance.Outbound().Outbound(it)
  487. return itOutbound
  488. }), func(it adapter.Outbound) bool {
  489. if it == nil {
  490. return false
  491. }
  492. _, isGroup := it.(adapter.OutboundGroup)
  493. if isGroup {
  494. return false
  495. }
  496. return true
  497. })
  498. b, _ := batch.New(boxService.ctx, batch.WithConcurrencyNum[any](10))
  499. for _, detour := range outbounds {
  500. outboundToTest := detour
  501. outboundTag := outboundToTest.Tag()
  502. b.Go(outboundTag, func() (any, error) {
  503. t, err := urltest.URLTest(boxService.ctx, "", outboundToTest)
  504. if err != nil {
  505. historyStorage.DeleteURLTestHistory(outboundTag)
  506. } else {
  507. historyStorage.StoreURLTestHistory(outboundTag, &adapter.URLTestHistory{
  508. Time: time.Now(),
  509. Delay: t,
  510. })
  511. }
  512. return nil, nil
  513. })
  514. }
  515. }
  516. return &emptypb.Empty{}, nil
  517. }
  518. func (s *StartedService) SelectOutbound(ctx context.Context, request *SelectOutboundRequest) (*emptypb.Empty, error) {
  519. s.serviceAccess.RLock()
  520. switch s.serviceStatus.Status {
  521. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  522. default:
  523. s.serviceAccess.RUnlock()
  524. return nil, os.ErrInvalid
  525. }
  526. boxService := s.instance.instance
  527. s.serviceAccess.RUnlock()
  528. outboundGroup, isLoaded := boxService.Outbound().Outbound(request.GroupTag)
  529. if !isLoaded {
  530. return nil, E.New("selector not found: ", request.GroupTag)
  531. }
  532. selector, isSelector := outboundGroup.(*group.Selector)
  533. if !isSelector {
  534. return nil, E.New("outbound is not a selector: ", request.GroupTag)
  535. }
  536. if !selector.SelectOutbound(request.OutboundTag) {
  537. return nil, E.New("outbound not found in selector: ", request.OutboundTag)
  538. }
  539. return &emptypb.Empty{}, nil
  540. }
  541. func (s *StartedService) SetGroupExpand(ctx context.Context, request *SetGroupExpandRequest) (*emptypb.Empty, error) {
  542. s.serviceAccess.RLock()
  543. switch s.serviceStatus.Status {
  544. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  545. default:
  546. s.serviceAccess.RUnlock()
  547. return nil, os.ErrInvalid
  548. }
  549. boxService := s.instance
  550. s.serviceAccess.RUnlock()
  551. if boxService.cacheFile != nil {
  552. err := boxService.cacheFile.StoreGroupExpand(request.GroupTag, request.IsExpand)
  553. if err != nil {
  554. return nil, err
  555. }
  556. }
  557. return &emptypb.Empty{}, nil
  558. }
  559. func (s *StartedService) GetSystemProxyStatus(ctx context.Context, empty *emptypb.Empty) (*SystemProxyStatus, error) {
  560. return s.platformHandler.SystemProxyStatus()
  561. }
  562. func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *SetSystemProxyEnabledRequest) (*emptypb.Empty, error) {
  563. err := s.platformHandler.SetSystemProxyEnabled(request.Enabled)
  564. if err != nil {
  565. return nil, err
  566. }
  567. return nil, err
  568. }
  569. func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[Connections]) error {
  570. s.serviceAccess.RLock()
  571. switch s.serviceStatus.Status {
  572. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  573. default:
  574. s.serviceAccess.RUnlock()
  575. return os.ErrInvalid
  576. }
  577. boxService := s.instance
  578. s.serviceAccess.RUnlock()
  579. ticker := time.NewTicker(time.Duration(request.Interval))
  580. defer ticker.Stop()
  581. trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
  582. var (
  583. connections = make(map[uuid.UUID]*Connection)
  584. outConnections []*Connection
  585. )
  586. for {
  587. outConnections = outConnections[:0]
  588. for _, connection := range trafficManager.Connections() {
  589. outConnections = append(outConnections, newConnection(connections, connection, false))
  590. }
  591. for _, connection := range trafficManager.ClosedConnections() {
  592. outConnections = append(outConnections, newConnection(connections, connection, true))
  593. }
  594. err := server.Send(&Connections{Connections: outConnections})
  595. if err != nil {
  596. return err
  597. }
  598. select {
  599. case <-s.ctx.Done():
  600. return s.ctx.Err()
  601. case <-server.Context().Done():
  602. return server.Context().Err()
  603. case <-ticker.C:
  604. }
  605. }
  606. }
  607. func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol.TrackerMetadata, isClosed bool) *Connection {
  608. if oldConnection, loaded := connections[metadata.ID]; loaded {
  609. if isClosed {
  610. if oldConnection.ClosedAt == 0 {
  611. oldConnection.Uplink = 0
  612. oldConnection.Downlink = 0
  613. oldConnection.ClosedAt = metadata.ClosedAt.UnixMilli()
  614. }
  615. return oldConnection
  616. }
  617. lastUplink := oldConnection.UplinkTotal
  618. lastDownlink := oldConnection.DownlinkTotal
  619. uplinkTotal := metadata.Upload.Load()
  620. downlinkTotal := metadata.Download.Load()
  621. oldConnection.Uplink = uplinkTotal - lastUplink
  622. oldConnection.Downlink = downlinkTotal - lastDownlink
  623. oldConnection.UplinkTotal = uplinkTotal
  624. oldConnection.DownlinkTotal = downlinkTotal
  625. return oldConnection
  626. }
  627. var rule string
  628. if metadata.Rule != nil {
  629. rule = metadata.Rule.String()
  630. }
  631. uplinkTotal := metadata.Upload.Load()
  632. downlinkTotal := metadata.Download.Load()
  633. uplink := uplinkTotal
  634. downlink := downlinkTotal
  635. var closedAt int64
  636. if !metadata.ClosedAt.IsZero() {
  637. closedAt = metadata.ClosedAt.UnixMilli()
  638. uplink = 0
  639. downlink = 0
  640. }
  641. connection := &Connection{
  642. Id: metadata.ID.String(),
  643. Inbound: metadata.Metadata.Inbound,
  644. InboundType: metadata.Metadata.InboundType,
  645. IpVersion: int32(metadata.Metadata.IPVersion),
  646. Network: metadata.Metadata.Network,
  647. Source: metadata.Metadata.Source.String(),
  648. Destination: metadata.Metadata.Destination.String(),
  649. Domain: metadata.Metadata.Domain,
  650. Protocol: metadata.Metadata.Protocol,
  651. User: metadata.Metadata.User,
  652. FromOutbound: metadata.Metadata.Outbound,
  653. CreatedAt: metadata.CreatedAt.UnixMilli(),
  654. ClosedAt: closedAt,
  655. Uplink: uplink,
  656. Downlink: downlink,
  657. UplinkTotal: uplinkTotal,
  658. DownlinkTotal: downlinkTotal,
  659. Rule: rule,
  660. Outbound: metadata.Outbound,
  661. OutboundType: metadata.OutboundType,
  662. ChainList: metadata.Chain,
  663. }
  664. connections[metadata.ID] = connection
  665. return connection
  666. }
  667. func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {
  668. s.serviceAccess.RLock()
  669. switch s.serviceStatus.Status {
  670. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  671. default:
  672. s.serviceAccess.RUnlock()
  673. return nil, os.ErrInvalid
  674. }
  675. boxService := s.instance
  676. s.serviceAccess.RUnlock()
  677. targetConn := boxService.clashServer.(*clashapi.Server).TrafficManager().Connection(uuid.FromStringOrNil(request.Id))
  678. if targetConn != nil {
  679. targetConn.Close()
  680. }
  681. return &emptypb.Empty{}, nil
  682. }
  683. func (s *StartedService) CloseAllConnections(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  684. conntrack.Close()
  685. return &emptypb.Empty{}, nil
  686. }
  687. func (s *StartedService) GetDeprecatedWarnings(ctx context.Context, empty *emptypb.Empty) (*DeprecatedWarnings, error) {
  688. s.serviceAccess.RLock()
  689. if s.serviceStatus.Status != ServiceStatus_STARTED {
  690. s.serviceAccess.RUnlock()
  691. return nil, os.ErrInvalid
  692. }
  693. boxService := s.instance
  694. s.serviceAccess.RUnlock()
  695. notes := service.FromContext[deprecated.Manager](boxService.ctx).(*deprecatedManager).Get()
  696. return &DeprecatedWarnings{
  697. Warnings: common.Map(notes, func(it deprecated.Note) *DeprecatedWarning {
  698. return &DeprecatedWarning{
  699. Message: it.Message(),
  700. Impending: it.Impending(),
  701. MigrationLink: it.MigrationLink,
  702. }
  703. }),
  704. }, nil
  705. }
  706. func (s *StartedService) SubscribeHelperEvents(empty *emptypb.Empty, server grpc.ServerStreamingServer[HelperRequest]) error {
  707. return os.ErrInvalid
  708. }
  709. func (s *StartedService) SendHelperResponse(ctx context.Context, response *HelperResponse) (*emptypb.Empty, error) {
  710. return nil, os.ErrInvalid
  711. }
  712. func (s *StartedService) mustEmbedUnimplementedStartedServiceServer() {
  713. }
  714. func (s *StartedService) WriteMessage(level log.Level, message string) {
  715. item := &log.Entry{Level: level, Message: message}
  716. s.logSubscriber.Emit(item)
  717. s.logAccess.Lock()
  718. s.logLines.PushBack(item)
  719. if s.logLines.Len() > s.logMaxLines {
  720. s.logLines.Remove(s.logLines.Front())
  721. }
  722. s.logAccess.Unlock()
  723. if s.debug {
  724. s.platformHandler.WriteDebugMessage(message)
  725. }
  726. }
  727. func (s *StartedService) Instance() *Instance {
  728. s.serviceAccess.RLock()
  729. defer s.serviceAccess.RUnlock()
  730. return s.instance
  731. }