started_service.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056
  1. package daemon
  2. import (
  3. "context"
  4. "os"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "github.com/sagernet/sing-box/adapter"
  9. "github.com/sagernet/sing-box/common/urltest"
  10. "github.com/sagernet/sing-box/experimental/clashapi"
  11. "github.com/sagernet/sing-box/experimental/clashapi/trafficontrol"
  12. "github.com/sagernet/sing-box/experimental/deprecated"
  13. "github.com/sagernet/sing-box/log"
  14. "github.com/sagernet/sing-box/protocol/group"
  15. "github.com/sagernet/sing/common"
  16. "github.com/sagernet/sing/common/batch"
  17. E "github.com/sagernet/sing/common/exceptions"
  18. "github.com/sagernet/sing/common/memory"
  19. "github.com/sagernet/sing/common/observable"
  20. "github.com/sagernet/sing/common/x/list"
  21. "github.com/sagernet/sing/service"
  22. "github.com/gofrs/uuid/v5"
  23. "google.golang.org/grpc"
  24. "google.golang.org/protobuf/types/known/emptypb"
  25. )
  26. var _ StartedServiceServer = (*StartedService)(nil)
  27. type StartedService struct {
  28. ctx context.Context
  29. // platform adapter.PlatformInterface
  30. handler PlatformHandler
  31. debug bool
  32. logMaxLines int
  33. oomKiller bool
  34. // workingDirectory string
  35. // tempDirectory string
  36. // userID int
  37. // groupID int
  38. // systemProxyEnabled bool
  39. serviceAccess sync.RWMutex
  40. serviceStatus *ServiceStatus
  41. serviceStatusSubscriber *observable.Subscriber[*ServiceStatus]
  42. serviceStatusObserver *observable.Observer[*ServiceStatus]
  43. logAccess sync.RWMutex
  44. logLines list.List[*log.Entry]
  45. logSubscriber *observable.Subscriber[*log.Entry]
  46. logObserver *observable.Observer[*log.Entry]
  47. instance *Instance
  48. startedAt time.Time
  49. urlTestSubscriber *observable.Subscriber[struct{}]
  50. urlTestObserver *observable.Observer[struct{}]
  51. urlTestHistoryStorage *urltest.HistoryStorage
  52. clashModeSubscriber *observable.Subscriber[struct{}]
  53. clashModeObserver *observable.Observer[struct{}]
  54. connectionEventSubscriber *observable.Subscriber[trafficontrol.ConnectionEvent]
  55. connectionEventObserver *observable.Observer[trafficontrol.ConnectionEvent]
  56. }
  57. type ServiceOptions struct {
  58. Context context.Context
  59. // Platform adapter.PlatformInterface
  60. Handler PlatformHandler
  61. Debug bool
  62. LogMaxLines int
  63. OOMKiller bool
  64. // WorkingDirectory string
  65. // TempDirectory string
  66. // UserID int
  67. // GroupID int
  68. // SystemProxyEnabled bool
  69. }
  70. func NewStartedService(options ServiceOptions) *StartedService {
  71. s := &StartedService{
  72. ctx: options.Context,
  73. // platform: options.Platform,
  74. handler: options.Handler,
  75. debug: options.Debug,
  76. logMaxLines: options.LogMaxLines,
  77. oomKiller: options.OOMKiller,
  78. // workingDirectory: options.WorkingDirectory,
  79. // tempDirectory: options.TempDirectory,
  80. // userID: options.UserID,
  81. // groupID: options.GroupID,
  82. // systemProxyEnabled: options.SystemProxyEnabled,
  83. serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
  84. serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
  85. logSubscriber: observable.NewSubscriber[*log.Entry](128),
  86. urlTestSubscriber: observable.NewSubscriber[struct{}](1),
  87. urlTestHistoryStorage: urltest.NewHistoryStorage(),
  88. clashModeSubscriber: observable.NewSubscriber[struct{}](1),
  89. connectionEventSubscriber: observable.NewSubscriber[trafficontrol.ConnectionEvent](256),
  90. }
  91. s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
  92. s.logObserver = observable.NewObserver(s.logSubscriber, 64)
  93. s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
  94. s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
  95. s.connectionEventObserver = observable.NewObserver(s.connectionEventSubscriber, 64)
  96. return s
  97. }
  98. func (s *StartedService) resetLogs() {
  99. s.logAccess.Lock()
  100. s.logLines = list.List[*log.Entry]{}
  101. s.logAccess.Unlock()
  102. s.logSubscriber.Emit(nil)
  103. }
  104. func (s *StartedService) updateStatus(newStatus ServiceStatus_Type) {
  105. statusObject := &ServiceStatus{Status: newStatus}
  106. s.serviceStatusSubscriber.Emit(statusObject)
  107. s.serviceStatus = statusObject
  108. }
  109. func (s *StartedService) updateStatusError(err error) error {
  110. statusObject := &ServiceStatus{Status: ServiceStatus_FATAL, ErrorMessage: err.Error()}
  111. s.serviceStatusSubscriber.Emit(statusObject)
  112. s.serviceStatus = statusObject
  113. s.serviceAccess.Unlock()
  114. return err
  115. }
  116. func (s *StartedService) waitForStarted(ctx context.Context) error {
  117. s.serviceAccess.RLock()
  118. currentStatus := s.serviceStatus.Status
  119. s.serviceAccess.RUnlock()
  120. switch currentStatus {
  121. case ServiceStatus_STARTED:
  122. return nil
  123. case ServiceStatus_STARTING:
  124. default:
  125. return os.ErrInvalid
  126. }
  127. subscription, done, err := s.serviceStatusObserver.Subscribe()
  128. if err != nil {
  129. return err
  130. }
  131. defer s.serviceStatusObserver.UnSubscribe(subscription)
  132. for {
  133. select {
  134. case <-ctx.Done():
  135. return ctx.Err()
  136. case <-s.ctx.Done():
  137. return s.ctx.Err()
  138. case status := <-subscription:
  139. switch status.Status {
  140. case ServiceStatus_STARTED:
  141. return nil
  142. case ServiceStatus_FATAL:
  143. return E.New(status.ErrorMessage)
  144. case ServiceStatus_IDLE, ServiceStatus_STOPPING:
  145. return os.ErrInvalid
  146. }
  147. case <-done:
  148. return os.ErrClosed
  149. }
  150. }
  151. }
  152. func (s *StartedService) StartOrReloadService(profileContent string, options *OverrideOptions) error {
  153. s.serviceAccess.Lock()
  154. switch s.serviceStatus.Status {
  155. case ServiceStatus_IDLE, ServiceStatus_STARTED, ServiceStatus_STARTING:
  156. default:
  157. s.serviceAccess.Unlock()
  158. return os.ErrInvalid
  159. }
  160. oldInstance := s.instance
  161. if oldInstance != nil {
  162. s.updateStatus(ServiceStatus_STOPPING)
  163. s.serviceAccess.Unlock()
  164. _ = oldInstance.Close()
  165. s.serviceAccess.Lock()
  166. }
  167. s.updateStatus(ServiceStatus_STARTING)
  168. s.resetLogs()
  169. instance, err := s.newInstance(profileContent, options)
  170. if err != nil {
  171. return s.updateStatusError(err)
  172. }
  173. s.instance = instance
  174. instance.urlTestHistoryStorage.SetHook(s.urlTestSubscriber)
  175. if instance.clashServer != nil {
  176. instance.clashServer.SetModeUpdateHook(s.clashModeSubscriber)
  177. instance.clashServer.(*clashapi.Server).TrafficManager().SetEventHook(s.connectionEventSubscriber)
  178. }
  179. s.serviceAccess.Unlock()
  180. err = instance.Start()
  181. s.serviceAccess.Lock()
  182. if s.serviceStatus.Status != ServiceStatus_STARTING {
  183. s.serviceAccess.Unlock()
  184. return nil
  185. }
  186. if err != nil {
  187. return s.updateStatusError(err)
  188. }
  189. s.startedAt = time.Now()
  190. s.updateStatus(ServiceStatus_STARTED)
  191. s.serviceAccess.Unlock()
  192. runtime.GC()
  193. return nil
  194. }
  195. func (s *StartedService) Close() {
  196. s.serviceStatusSubscriber.Close()
  197. s.logSubscriber.Close()
  198. s.urlTestSubscriber.Close()
  199. s.clashModeSubscriber.Close()
  200. s.connectionEventSubscriber.Close()
  201. }
  202. func (s *StartedService) CloseService() error {
  203. s.serviceAccess.Lock()
  204. switch s.serviceStatus.Status {
  205. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  206. default:
  207. s.serviceAccess.Unlock()
  208. return os.ErrInvalid
  209. }
  210. s.updateStatus(ServiceStatus_STOPPING)
  211. if s.instance != nil {
  212. err := s.instance.Close()
  213. if err != nil {
  214. return s.updateStatusError(err)
  215. }
  216. }
  217. s.instance = nil
  218. s.startedAt = time.Time{}
  219. s.updateStatus(ServiceStatus_IDLE)
  220. s.serviceAccess.Unlock()
  221. runtime.GC()
  222. return nil
  223. }
  224. func (s *StartedService) SetError(err error) {
  225. s.serviceAccess.Lock()
  226. s.updateStatusError(err)
  227. s.WriteMessage(log.LevelError, err.Error())
  228. }
  229. func (s *StartedService) StopService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  230. err := s.handler.ServiceStop()
  231. if err != nil {
  232. return nil, err
  233. }
  234. return &emptypb.Empty{}, nil
  235. }
  236. func (s *StartedService) ReloadService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  237. err := s.handler.ServiceReload()
  238. if err != nil {
  239. return nil, err
  240. }
  241. return &emptypb.Empty{}, nil
  242. }
  243. func (s *StartedService) SubscribeServiceStatus(empty *emptypb.Empty, server grpc.ServerStreamingServer[ServiceStatus]) error {
  244. subscription, done, err := s.serviceStatusObserver.Subscribe()
  245. if err != nil {
  246. return err
  247. }
  248. defer s.serviceStatusObserver.UnSubscribe(subscription)
  249. err = server.Send(s.serviceStatus)
  250. if err != nil {
  251. return err
  252. }
  253. for {
  254. select {
  255. case <-s.ctx.Done():
  256. return s.ctx.Err()
  257. case <-server.Context().Done():
  258. return server.Context().Err()
  259. case newStatus := <-subscription:
  260. err = server.Send(newStatus)
  261. if err != nil {
  262. return err
  263. }
  264. case <-done:
  265. return nil
  266. }
  267. }
  268. }
  269. func (s *StartedService) SubscribeLog(empty *emptypb.Empty, server grpc.ServerStreamingServer[Log]) error {
  270. var savedLines []*log.Entry
  271. s.logAccess.Lock()
  272. savedLines = make([]*log.Entry, 0, s.logLines.Len())
  273. for element := s.logLines.Front(); element != nil; element = element.Next() {
  274. savedLines = append(savedLines, element.Value)
  275. }
  276. subscription, done, err := s.logObserver.Subscribe()
  277. s.logAccess.Unlock()
  278. if err != nil {
  279. return err
  280. }
  281. defer s.logObserver.UnSubscribe(subscription)
  282. err = server.Send(&Log{
  283. Messages: common.Map(savedLines, func(it *log.Entry) *Log_Message {
  284. return &Log_Message{
  285. Level: LogLevel(it.Level),
  286. Message: it.Message,
  287. }
  288. }),
  289. Reset_: true,
  290. })
  291. if err != nil {
  292. return err
  293. }
  294. for {
  295. select {
  296. case <-s.ctx.Done():
  297. return s.ctx.Err()
  298. case <-server.Context().Done():
  299. return server.Context().Err()
  300. case message := <-subscription:
  301. var rawMessage Log
  302. if message == nil {
  303. rawMessage.Reset_ = true
  304. } else {
  305. rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
  306. Level: LogLevel(message.Level),
  307. Message: message.Message,
  308. })
  309. }
  310. fetch:
  311. for {
  312. select {
  313. case message = <-subscription:
  314. if message == nil {
  315. rawMessage.Messages = nil
  316. rawMessage.Reset_ = true
  317. } else {
  318. rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
  319. Level: LogLevel(message.Level),
  320. Message: message.Message,
  321. })
  322. }
  323. default:
  324. break fetch
  325. }
  326. }
  327. err = server.Send(&rawMessage)
  328. if err != nil {
  329. return err
  330. }
  331. case <-done:
  332. return nil
  333. }
  334. }
  335. }
  336. func (s *StartedService) GetDefaultLogLevel(ctx context.Context, empty *emptypb.Empty) (*DefaultLogLevel, error) {
  337. s.serviceAccess.RLock()
  338. switch s.serviceStatus.Status {
  339. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  340. default:
  341. s.serviceAccess.RUnlock()
  342. return nil, os.ErrInvalid
  343. }
  344. logLevel := s.instance.instance.LogFactory().Level()
  345. s.serviceAccess.RUnlock()
  346. return &DefaultLogLevel{Level: LogLevel(logLevel)}, nil
  347. }
  348. func (s *StartedService) ClearLogs(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  349. s.resetLogs()
  350. return &emptypb.Empty{}, nil
  351. }
  352. func (s *StartedService) SubscribeStatus(request *SubscribeStatusRequest, server grpc.ServerStreamingServer[Status]) error {
  353. interval := time.Duration(request.Interval)
  354. if interval <= 0 {
  355. interval = time.Second // Default to 1 second
  356. }
  357. ticker := time.NewTicker(interval)
  358. defer ticker.Stop()
  359. status := s.readStatus()
  360. uploadTotal := status.UplinkTotal
  361. downloadTotal := status.DownlinkTotal
  362. for {
  363. err := server.Send(status)
  364. if err != nil {
  365. return err
  366. }
  367. select {
  368. case <-s.ctx.Done():
  369. return s.ctx.Err()
  370. case <-server.Context().Done():
  371. return server.Context().Err()
  372. case <-ticker.C:
  373. }
  374. status = s.readStatus()
  375. upload := status.UplinkTotal - uploadTotal
  376. download := status.DownlinkTotal - downloadTotal
  377. uploadTotal = status.UplinkTotal
  378. downloadTotal = status.DownlinkTotal
  379. status.Uplink = upload
  380. status.Downlink = download
  381. }
  382. }
  383. func (s *StartedService) readStatus() *Status {
  384. var status Status
  385. status.Memory = memory.Total()
  386. status.Goroutines = int32(runtime.NumGoroutine())
  387. s.serviceAccess.RLock()
  388. nowService := s.instance
  389. s.serviceAccess.RUnlock()
  390. if nowService != nil && nowService.connectionManager != nil {
  391. status.ConnectionsOut = int32(nowService.connectionManager.Count())
  392. }
  393. if nowService != nil {
  394. if clashServer := nowService.clashServer; clashServer != nil {
  395. status.TrafficAvailable = true
  396. trafficManager := clashServer.(*clashapi.Server).TrafficManager()
  397. status.UplinkTotal, status.DownlinkTotal = trafficManager.Total()
  398. status.ConnectionsIn = int32(trafficManager.ConnectionsLen())
  399. }
  400. }
  401. return &status
  402. }
  403. func (s *StartedService) SubscribeGroups(empty *emptypb.Empty, server grpc.ServerStreamingServer[Groups]) error {
  404. err := s.waitForStarted(server.Context())
  405. if err != nil {
  406. return err
  407. }
  408. subscription, done, err := s.urlTestObserver.Subscribe()
  409. if err != nil {
  410. return err
  411. }
  412. defer s.urlTestObserver.UnSubscribe(subscription)
  413. for {
  414. s.serviceAccess.RLock()
  415. if s.serviceStatus.Status != ServiceStatus_STARTED {
  416. s.serviceAccess.RUnlock()
  417. return os.ErrInvalid
  418. }
  419. groups := s.readGroups()
  420. s.serviceAccess.RUnlock()
  421. err = server.Send(groups)
  422. if err != nil {
  423. return err
  424. }
  425. select {
  426. case <-subscription:
  427. case <-s.ctx.Done():
  428. return s.ctx.Err()
  429. case <-server.Context().Done():
  430. return server.Context().Err()
  431. case <-done:
  432. return nil
  433. }
  434. }
  435. }
  436. func (s *StartedService) readGroups() *Groups {
  437. historyStorage := s.instance.urlTestHistoryStorage
  438. boxService := s.instance
  439. outbounds := boxService.instance.Outbound().Outbounds()
  440. var iGroups []adapter.OutboundGroup
  441. for _, it := range outbounds {
  442. if group, isGroup := it.(adapter.OutboundGroup); isGroup {
  443. iGroups = append(iGroups, group)
  444. }
  445. }
  446. var gs Groups
  447. for _, iGroup := range iGroups {
  448. var g Group
  449. g.Tag = iGroup.Tag()
  450. g.Type = iGroup.Type()
  451. _, g.Selectable = iGroup.(*group.Selector)
  452. g.Selected = iGroup.Now()
  453. if boxService.cacheFile != nil {
  454. if isExpand, loaded := boxService.cacheFile.LoadGroupExpand(g.Tag); loaded {
  455. g.IsExpand = isExpand
  456. }
  457. }
  458. for _, itemTag := range iGroup.All() {
  459. itemOutbound, isLoaded := boxService.instance.Outbound().Outbound(itemTag)
  460. if !isLoaded {
  461. continue
  462. }
  463. var item GroupItem
  464. item.Tag = itemTag
  465. item.Type = itemOutbound.Type()
  466. if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(itemOutbound)); history != nil {
  467. item.UrlTestTime = history.Time.Unix()
  468. item.UrlTestDelay = int32(history.Delay)
  469. }
  470. g.Items = append(g.Items, &item)
  471. }
  472. if len(g.Items) < 2 {
  473. continue
  474. }
  475. gs.Group = append(gs.Group, &g)
  476. }
  477. return &gs
  478. }
  479. func (s *StartedService) GetClashModeStatus(ctx context.Context, empty *emptypb.Empty) (*ClashModeStatus, error) {
  480. s.serviceAccess.RLock()
  481. if s.serviceStatus.Status != ServiceStatus_STARTED {
  482. s.serviceAccess.RUnlock()
  483. return nil, os.ErrInvalid
  484. }
  485. clashServer := s.instance.clashServer
  486. s.serviceAccess.RUnlock()
  487. if clashServer == nil {
  488. return nil, os.ErrInvalid
  489. }
  490. return &ClashModeStatus{
  491. ModeList: clashServer.ModeList(),
  492. CurrentMode: clashServer.Mode(),
  493. }, nil
  494. }
  495. func (s *StartedService) SubscribeClashMode(empty *emptypb.Empty, server grpc.ServerStreamingServer[ClashMode]) error {
  496. err := s.waitForStarted(server.Context())
  497. if err != nil {
  498. return err
  499. }
  500. subscription, done, err := s.clashModeObserver.Subscribe()
  501. if err != nil {
  502. return err
  503. }
  504. defer s.clashModeObserver.UnSubscribe(subscription)
  505. for {
  506. s.serviceAccess.RLock()
  507. if s.serviceStatus.Status != ServiceStatus_STARTED {
  508. s.serviceAccess.RUnlock()
  509. return os.ErrInvalid
  510. }
  511. message := &ClashMode{Mode: s.instance.clashServer.Mode()}
  512. s.serviceAccess.RUnlock()
  513. err = server.Send(message)
  514. if err != nil {
  515. return err
  516. }
  517. select {
  518. case <-subscription:
  519. case <-s.ctx.Done():
  520. return s.ctx.Err()
  521. case <-server.Context().Done():
  522. return server.Context().Err()
  523. case <-done:
  524. return nil
  525. }
  526. }
  527. }
  528. func (s *StartedService) SetClashMode(ctx context.Context, request *ClashMode) (*emptypb.Empty, error) {
  529. s.serviceAccess.RLock()
  530. if s.serviceStatus.Status != ServiceStatus_STARTED {
  531. s.serviceAccess.RUnlock()
  532. return nil, os.ErrInvalid
  533. }
  534. clashServer := s.instance.clashServer
  535. s.serviceAccess.RUnlock()
  536. clashServer.(*clashapi.Server).SetMode(request.Mode)
  537. return &emptypb.Empty{}, nil
  538. }
  539. func (s *StartedService) URLTest(ctx context.Context, request *URLTestRequest) (*emptypb.Empty, error) {
  540. s.serviceAccess.RLock()
  541. if s.serviceStatus.Status != ServiceStatus_STARTED {
  542. s.serviceAccess.RUnlock()
  543. return nil, os.ErrInvalid
  544. }
  545. boxService := s.instance
  546. s.serviceAccess.RUnlock()
  547. groupTag := request.OutboundTag
  548. abstractOutboundGroup, isLoaded := boxService.instance.Outbound().Outbound(groupTag)
  549. if !isLoaded {
  550. return nil, E.New("outbound group not found: ", groupTag)
  551. }
  552. outboundGroup, isOutboundGroup := abstractOutboundGroup.(adapter.OutboundGroup)
  553. if !isOutboundGroup {
  554. return nil, E.New("outbound is not a group: ", groupTag)
  555. }
  556. urlTest, isURLTest := abstractOutboundGroup.(*group.URLTest)
  557. if isURLTest {
  558. go urlTest.CheckOutbounds()
  559. } else {
  560. historyStorage := boxService.urlTestHistoryStorage
  561. outbounds := common.Filter(common.Map(outboundGroup.All(), func(it string) adapter.Outbound {
  562. itOutbound, _ := boxService.instance.Outbound().Outbound(it)
  563. return itOutbound
  564. }), func(it adapter.Outbound) bool {
  565. if it == nil {
  566. return false
  567. }
  568. _, isGroup := it.(adapter.OutboundGroup)
  569. if isGroup {
  570. return false
  571. }
  572. return true
  573. })
  574. b, _ := batch.New(boxService.ctx, batch.WithConcurrencyNum[any](10))
  575. for _, detour := range outbounds {
  576. outboundToTest := detour
  577. outboundTag := outboundToTest.Tag()
  578. b.Go(outboundTag, func() (any, error) {
  579. t, err := urltest.URLTest(boxService.ctx, "", outboundToTest)
  580. if err != nil {
  581. historyStorage.DeleteURLTestHistory(outboundTag)
  582. } else {
  583. historyStorage.StoreURLTestHistory(outboundTag, &adapter.URLTestHistory{
  584. Time: time.Now(),
  585. Delay: t,
  586. })
  587. }
  588. return nil, nil
  589. })
  590. }
  591. }
  592. return &emptypb.Empty{}, nil
  593. }
  594. func (s *StartedService) SelectOutbound(ctx context.Context, request *SelectOutboundRequest) (*emptypb.Empty, error) {
  595. s.serviceAccess.RLock()
  596. switch s.serviceStatus.Status {
  597. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  598. default:
  599. s.serviceAccess.RUnlock()
  600. return nil, os.ErrInvalid
  601. }
  602. boxService := s.instance.instance
  603. s.serviceAccess.RUnlock()
  604. outboundGroup, isLoaded := boxService.Outbound().Outbound(request.GroupTag)
  605. if !isLoaded {
  606. return nil, E.New("selector not found: ", request.GroupTag)
  607. }
  608. selector, isSelector := outboundGroup.(*group.Selector)
  609. if !isSelector {
  610. return nil, E.New("outbound is not a selector: ", request.GroupTag)
  611. }
  612. if !selector.SelectOutbound(request.OutboundTag) {
  613. return nil, E.New("outbound not found in selector: ", request.OutboundTag)
  614. }
  615. s.urlTestObserver.Emit(struct{}{})
  616. return &emptypb.Empty{}, nil
  617. }
  618. func (s *StartedService) SetGroupExpand(ctx context.Context, request *SetGroupExpandRequest) (*emptypb.Empty, error) {
  619. s.serviceAccess.RLock()
  620. switch s.serviceStatus.Status {
  621. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  622. default:
  623. s.serviceAccess.RUnlock()
  624. return nil, os.ErrInvalid
  625. }
  626. boxService := s.instance
  627. s.serviceAccess.RUnlock()
  628. if boxService.cacheFile != nil {
  629. err := boxService.cacheFile.StoreGroupExpand(request.GroupTag, request.IsExpand)
  630. if err != nil {
  631. return nil, err
  632. }
  633. }
  634. return &emptypb.Empty{}, nil
  635. }
  636. func (s *StartedService) GetSystemProxyStatus(ctx context.Context, empty *emptypb.Empty) (*SystemProxyStatus, error) {
  637. return s.handler.SystemProxyStatus()
  638. }
  639. func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *SetSystemProxyEnabledRequest) (*emptypb.Empty, error) {
  640. err := s.handler.SetSystemProxyEnabled(request.Enabled)
  641. if err != nil {
  642. return nil, err
  643. }
  644. return nil, err
  645. }
  646. func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[ConnectionEvents]) error {
  647. err := s.waitForStarted(server.Context())
  648. if err != nil {
  649. return err
  650. }
  651. s.serviceAccess.RLock()
  652. boxService := s.instance
  653. s.serviceAccess.RUnlock()
  654. if boxService.clashServer == nil {
  655. return E.New("clash server not available")
  656. }
  657. trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
  658. subscription, done, err := s.connectionEventObserver.Subscribe()
  659. if err != nil {
  660. return err
  661. }
  662. defer s.connectionEventObserver.UnSubscribe(subscription)
  663. connectionSnapshots := make(map[uuid.UUID]connectionSnapshot)
  664. initialEvents := s.buildInitialConnectionState(trafficManager, connectionSnapshots)
  665. err = server.Send(&ConnectionEvents{
  666. Events: initialEvents,
  667. Reset_: true,
  668. })
  669. if err != nil {
  670. return err
  671. }
  672. interval := time.Duration(request.Interval)
  673. if interval <= 0 {
  674. interval = time.Second
  675. }
  676. ticker := time.NewTicker(interval)
  677. defer ticker.Stop()
  678. for {
  679. select {
  680. case <-s.ctx.Done():
  681. return s.ctx.Err()
  682. case <-server.Context().Done():
  683. return server.Context().Err()
  684. case <-done:
  685. return nil
  686. case event := <-subscription:
  687. var pendingEvents []*ConnectionEvent
  688. if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
  689. pendingEvents = append(pendingEvents, protoEvent)
  690. }
  691. drain:
  692. for {
  693. select {
  694. case event = <-subscription:
  695. if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
  696. pendingEvents = append(pendingEvents, protoEvent)
  697. }
  698. default:
  699. break drain
  700. }
  701. }
  702. if len(pendingEvents) > 0 {
  703. err = server.Send(&ConnectionEvents{Events: pendingEvents})
  704. if err != nil {
  705. return err
  706. }
  707. }
  708. case <-ticker.C:
  709. protoEvents := s.buildTrafficUpdates(trafficManager, connectionSnapshots)
  710. if len(protoEvents) == 0 {
  711. continue
  712. }
  713. err = server.Send(&ConnectionEvents{Events: protoEvents})
  714. if err != nil {
  715. return err
  716. }
  717. }
  718. }
  719. }
  720. type connectionSnapshot struct {
  721. uplink int64
  722. downlink int64
  723. hadTraffic bool
  724. }
  725. func (s *StartedService) buildInitialConnectionState(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
  726. var events []*ConnectionEvent
  727. for _, metadata := range manager.Connections() {
  728. events = append(events, &ConnectionEvent{
  729. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  730. Id: metadata.ID.String(),
  731. Connection: buildConnectionProto(metadata),
  732. })
  733. snapshots[metadata.ID] = connectionSnapshot{
  734. uplink: metadata.Upload.Load(),
  735. downlink: metadata.Download.Load(),
  736. }
  737. }
  738. for _, metadata := range manager.ClosedConnections() {
  739. conn := buildConnectionProto(metadata)
  740. conn.ClosedAt = metadata.ClosedAt.UnixMilli()
  741. events = append(events, &ConnectionEvent{
  742. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  743. Id: metadata.ID.String(),
  744. Connection: conn,
  745. })
  746. }
  747. return events
  748. }
  749. func (s *StartedService) applyConnectionEvent(event trafficontrol.ConnectionEvent, snapshots map[uuid.UUID]connectionSnapshot) *ConnectionEvent {
  750. switch event.Type {
  751. case trafficontrol.ConnectionEventNew:
  752. if _, exists := snapshots[event.ID]; exists {
  753. return nil
  754. }
  755. snapshots[event.ID] = connectionSnapshot{
  756. uplink: event.Metadata.Upload.Load(),
  757. downlink: event.Metadata.Download.Load(),
  758. }
  759. return &ConnectionEvent{
  760. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  761. Id: event.ID.String(),
  762. Connection: buildConnectionProto(event.Metadata),
  763. }
  764. case trafficontrol.ConnectionEventClosed:
  765. delete(snapshots, event.ID)
  766. protoEvent := &ConnectionEvent{
  767. Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
  768. Id: event.ID.String(),
  769. }
  770. closedAt := event.ClosedAt
  771. if closedAt.IsZero() && !event.Metadata.ClosedAt.IsZero() {
  772. closedAt = event.Metadata.ClosedAt
  773. }
  774. if closedAt.IsZero() {
  775. closedAt = time.Now()
  776. }
  777. protoEvent.ClosedAt = closedAt.UnixMilli()
  778. if event.Metadata.ID != uuid.Nil {
  779. conn := buildConnectionProto(event.Metadata)
  780. conn.ClosedAt = protoEvent.ClosedAt
  781. protoEvent.Connection = conn
  782. }
  783. return protoEvent
  784. default:
  785. return nil
  786. }
  787. }
  788. func (s *StartedService) buildTrafficUpdates(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
  789. activeConnections := manager.Connections()
  790. activeIndex := make(map[uuid.UUID]*trafficontrol.TrackerMetadata, len(activeConnections))
  791. var events []*ConnectionEvent
  792. for _, metadata := range activeConnections {
  793. activeIndex[metadata.ID] = metadata
  794. currentUpload := metadata.Upload.Load()
  795. currentDownload := metadata.Download.Load()
  796. snapshot, exists := snapshots[metadata.ID]
  797. if !exists {
  798. snapshots[metadata.ID] = connectionSnapshot{
  799. uplink: currentUpload,
  800. downlink: currentDownload,
  801. }
  802. events = append(events, &ConnectionEvent{
  803. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  804. Id: metadata.ID.String(),
  805. Connection: buildConnectionProto(metadata),
  806. })
  807. continue
  808. }
  809. uplinkDelta := currentUpload - snapshot.uplink
  810. downlinkDelta := currentDownload - snapshot.downlink
  811. if uplinkDelta < 0 || downlinkDelta < 0 {
  812. if snapshot.hadTraffic {
  813. events = append(events, &ConnectionEvent{
  814. Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
  815. Id: metadata.ID.String(),
  816. UplinkDelta: 0,
  817. DownlinkDelta: 0,
  818. })
  819. }
  820. snapshot.uplink = currentUpload
  821. snapshot.downlink = currentDownload
  822. snapshot.hadTraffic = false
  823. snapshots[metadata.ID] = snapshot
  824. continue
  825. }
  826. if uplinkDelta > 0 || downlinkDelta > 0 {
  827. snapshot.uplink = currentUpload
  828. snapshot.downlink = currentDownload
  829. snapshot.hadTraffic = true
  830. snapshots[metadata.ID] = snapshot
  831. events = append(events, &ConnectionEvent{
  832. Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
  833. Id: metadata.ID.String(),
  834. UplinkDelta: uplinkDelta,
  835. DownlinkDelta: downlinkDelta,
  836. })
  837. continue
  838. }
  839. if snapshot.hadTraffic {
  840. snapshot.uplink = currentUpload
  841. snapshot.downlink = currentDownload
  842. snapshot.hadTraffic = false
  843. snapshots[metadata.ID] = snapshot
  844. events = append(events, &ConnectionEvent{
  845. Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
  846. Id: metadata.ID.String(),
  847. UplinkDelta: 0,
  848. DownlinkDelta: 0,
  849. })
  850. }
  851. }
  852. var closedIndex map[uuid.UUID]*trafficontrol.TrackerMetadata
  853. for id := range snapshots {
  854. if _, exists := activeIndex[id]; exists {
  855. continue
  856. }
  857. if closedIndex == nil {
  858. closedIndex = make(map[uuid.UUID]*trafficontrol.TrackerMetadata)
  859. for _, metadata := range manager.ClosedConnections() {
  860. closedIndex[metadata.ID] = metadata
  861. }
  862. }
  863. closedAt := time.Now()
  864. var conn *Connection
  865. if metadata, ok := closedIndex[id]; ok {
  866. if !metadata.ClosedAt.IsZero() {
  867. closedAt = metadata.ClosedAt
  868. }
  869. conn = buildConnectionProto(metadata)
  870. conn.ClosedAt = closedAt.UnixMilli()
  871. }
  872. events = append(events, &ConnectionEvent{
  873. Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
  874. Id: id.String(),
  875. ClosedAt: closedAt.UnixMilli(),
  876. Connection: conn,
  877. })
  878. delete(snapshots, id)
  879. }
  880. return events
  881. }
  882. func buildConnectionProto(metadata *trafficontrol.TrackerMetadata) *Connection {
  883. var rule string
  884. if metadata.Rule != nil {
  885. rule = metadata.Rule.String()
  886. }
  887. uplinkTotal := metadata.Upload.Load()
  888. downlinkTotal := metadata.Download.Load()
  889. var processInfo *ProcessInfo
  890. if metadata.Metadata.ProcessInfo != nil {
  891. processInfo = &ProcessInfo{
  892. ProcessId: metadata.Metadata.ProcessInfo.ProcessID,
  893. UserId: metadata.Metadata.ProcessInfo.UserId,
  894. UserName: metadata.Metadata.ProcessInfo.UserName,
  895. ProcessPath: metadata.Metadata.ProcessInfo.ProcessPath,
  896. PackageName: metadata.Metadata.ProcessInfo.AndroidPackageName,
  897. }
  898. }
  899. return &Connection{
  900. Id: metadata.ID.String(),
  901. Inbound: metadata.Metadata.Inbound,
  902. InboundType: metadata.Metadata.InboundType,
  903. IpVersion: int32(metadata.Metadata.IPVersion),
  904. Network: metadata.Metadata.Network,
  905. Source: metadata.Metadata.Source.String(),
  906. Destination: metadata.Metadata.Destination.String(),
  907. Domain: metadata.Metadata.Domain,
  908. Protocol: metadata.Metadata.Protocol,
  909. User: metadata.Metadata.User,
  910. FromOutbound: metadata.Metadata.Outbound,
  911. CreatedAt: metadata.CreatedAt.UnixMilli(),
  912. UplinkTotal: uplinkTotal,
  913. DownlinkTotal: downlinkTotal,
  914. Rule: rule,
  915. Outbound: metadata.Outbound,
  916. OutboundType: metadata.OutboundType,
  917. ChainList: metadata.Chain,
  918. ProcessInfo: processInfo,
  919. }
  920. }
  921. func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {
  922. s.serviceAccess.RLock()
  923. switch s.serviceStatus.Status {
  924. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  925. default:
  926. s.serviceAccess.RUnlock()
  927. return nil, os.ErrInvalid
  928. }
  929. boxService := s.instance
  930. s.serviceAccess.RUnlock()
  931. targetConn := boxService.clashServer.(*clashapi.Server).TrafficManager().Connection(uuid.FromStringOrNil(request.Id))
  932. if targetConn != nil {
  933. targetConn.Close()
  934. }
  935. return &emptypb.Empty{}, nil
  936. }
  937. func (s *StartedService) CloseAllConnections(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  938. s.serviceAccess.RLock()
  939. nowService := s.instance
  940. s.serviceAccess.RUnlock()
  941. if nowService != nil && nowService.connectionManager != nil {
  942. nowService.connectionManager.CloseAll()
  943. }
  944. return &emptypb.Empty{}, nil
  945. }
  946. func (s *StartedService) GetDeprecatedWarnings(ctx context.Context, empty *emptypb.Empty) (*DeprecatedWarnings, error) {
  947. s.serviceAccess.RLock()
  948. if s.serviceStatus.Status != ServiceStatus_STARTED {
  949. s.serviceAccess.RUnlock()
  950. return nil, os.ErrInvalid
  951. }
  952. boxService := s.instance
  953. s.serviceAccess.RUnlock()
  954. notes := service.FromContext[deprecated.Manager](boxService.ctx).(*deprecatedManager).Get()
  955. return &DeprecatedWarnings{
  956. Warnings: common.Map(notes, func(it deprecated.Note) *DeprecatedWarning {
  957. return &DeprecatedWarning{
  958. Message: it.Message(),
  959. Impending: it.Impending(),
  960. MigrationLink: it.MigrationLink,
  961. }
  962. }),
  963. }, nil
  964. }
  965. func (s *StartedService) GetStartedAt(ctx context.Context, empty *emptypb.Empty) (*StartedAt, error) {
  966. s.serviceAccess.RLock()
  967. defer s.serviceAccess.RUnlock()
  968. return &StartedAt{StartedAt: s.startedAt.UnixMilli()}, nil
  969. }
  970. func (s *StartedService) mustEmbedUnimplementedStartedServiceServer() {
  971. }
  972. func (s *StartedService) WriteMessage(level log.Level, message string) {
  973. item := &log.Entry{Level: level, Message: message}
  974. s.logAccess.Lock()
  975. s.logLines.PushBack(item)
  976. if s.logLines.Len() > s.logMaxLines {
  977. s.logLines.Remove(s.logLines.Front())
  978. }
  979. s.logAccess.Unlock()
  980. s.logSubscriber.Emit(item)
  981. if s.debug {
  982. s.handler.WriteDebugMessage(message)
  983. }
  984. }
  985. func (s *StartedService) Instance() *Instance {
  986. s.serviceAccess.RLock()
  987. defer s.serviceAccess.RUnlock()
  988. return s.instance
  989. }