started_service.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057
  1. package daemon
  2. import (
  3. "context"
  4. "os"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "github.com/sagernet/sing-box/adapter"
  9. "github.com/sagernet/sing-box/common/urltest"
  10. "github.com/sagernet/sing-box/experimental/clashapi"
  11. "github.com/sagernet/sing-box/experimental/clashapi/trafficontrol"
  12. "github.com/sagernet/sing-box/experimental/deprecated"
  13. "github.com/sagernet/sing-box/log"
  14. "github.com/sagernet/sing-box/protocol/group"
  15. "github.com/sagernet/sing/common"
  16. "github.com/sagernet/sing/common/batch"
  17. E "github.com/sagernet/sing/common/exceptions"
  18. "github.com/sagernet/sing/common/memory"
  19. "github.com/sagernet/sing/common/observable"
  20. "github.com/sagernet/sing/common/x/list"
  21. "github.com/sagernet/sing/service"
  22. "github.com/gofrs/uuid/v5"
  23. "google.golang.org/grpc"
  24. "google.golang.org/protobuf/types/known/emptypb"
  25. )
  26. var _ StartedServiceServer = (*StartedService)(nil)
  27. type StartedService struct {
  28. ctx context.Context
  29. // platform adapter.PlatformInterface
  30. handler PlatformHandler
  31. debug bool
  32. logMaxLines int
  33. oomKiller bool
  34. // workingDirectory string
  35. // tempDirectory string
  36. // userID int
  37. // groupID int
  38. // systemProxyEnabled bool
  39. serviceAccess sync.RWMutex
  40. serviceStatus *ServiceStatus
  41. serviceStatusSubscriber *observable.Subscriber[*ServiceStatus]
  42. serviceStatusObserver *observable.Observer[*ServiceStatus]
  43. logAccess sync.RWMutex
  44. logLines list.List[*log.Entry]
  45. logSubscriber *observable.Subscriber[*log.Entry]
  46. logObserver *observable.Observer[*log.Entry]
  47. instance *Instance
  48. startedAt time.Time
  49. urlTestSubscriber *observable.Subscriber[struct{}]
  50. urlTestObserver *observable.Observer[struct{}]
  51. urlTestHistoryStorage *urltest.HistoryStorage
  52. clashModeSubscriber *observable.Subscriber[struct{}]
  53. clashModeObserver *observable.Observer[struct{}]
  54. connectionEventSubscriber *observable.Subscriber[trafficontrol.ConnectionEvent]
  55. connectionEventObserver *observable.Observer[trafficontrol.ConnectionEvent]
  56. }
  57. type ServiceOptions struct {
  58. Context context.Context
  59. // Platform adapter.PlatformInterface
  60. Handler PlatformHandler
  61. Debug bool
  62. LogMaxLines int
  63. OOMKiller bool
  64. // WorkingDirectory string
  65. // TempDirectory string
  66. // UserID int
  67. // GroupID int
  68. // SystemProxyEnabled bool
  69. }
  70. func NewStartedService(options ServiceOptions) *StartedService {
  71. s := &StartedService{
  72. ctx: options.Context,
  73. // platform: options.Platform,
  74. handler: options.Handler,
  75. debug: options.Debug,
  76. logMaxLines: options.LogMaxLines,
  77. oomKiller: options.OOMKiller,
  78. // workingDirectory: options.WorkingDirectory,
  79. // tempDirectory: options.TempDirectory,
  80. // userID: options.UserID,
  81. // groupID: options.GroupID,
  82. // systemProxyEnabled: options.SystemProxyEnabled,
  83. serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
  84. serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
  85. logSubscriber: observable.NewSubscriber[*log.Entry](128),
  86. urlTestSubscriber: observable.NewSubscriber[struct{}](1),
  87. urlTestHistoryStorage: urltest.NewHistoryStorage(),
  88. clashModeSubscriber: observable.NewSubscriber[struct{}](1),
  89. connectionEventSubscriber: observable.NewSubscriber[trafficontrol.ConnectionEvent](256),
  90. }
  91. s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
  92. s.logObserver = observable.NewObserver(s.logSubscriber, 64)
  93. s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
  94. s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
  95. s.connectionEventObserver = observable.NewObserver(s.connectionEventSubscriber, 64)
  96. return s
  97. }
  98. func (s *StartedService) resetLogs() {
  99. s.logAccess.Lock()
  100. s.logLines = list.List[*log.Entry]{}
  101. s.logAccess.Unlock()
  102. s.logSubscriber.Emit(nil)
  103. }
  104. func (s *StartedService) updateStatus(newStatus ServiceStatus_Type) {
  105. statusObject := &ServiceStatus{Status: newStatus}
  106. s.serviceStatusSubscriber.Emit(statusObject)
  107. s.serviceStatus = statusObject
  108. }
  109. func (s *StartedService) updateStatusError(err error) error {
  110. statusObject := &ServiceStatus{Status: ServiceStatus_FATAL, ErrorMessage: err.Error()}
  111. s.serviceStatusSubscriber.Emit(statusObject)
  112. s.serviceStatus = statusObject
  113. s.serviceAccess.Unlock()
  114. return err
  115. }
  116. func (s *StartedService) waitForStarted(ctx context.Context) error {
  117. s.serviceAccess.RLock()
  118. currentStatus := s.serviceStatus.Status
  119. s.serviceAccess.RUnlock()
  120. switch currentStatus {
  121. case ServiceStatus_STARTED:
  122. return nil
  123. case ServiceStatus_STARTING:
  124. default:
  125. return os.ErrInvalid
  126. }
  127. subscription, done, err := s.serviceStatusObserver.Subscribe()
  128. if err != nil {
  129. return err
  130. }
  131. defer s.serviceStatusObserver.UnSubscribe(subscription)
  132. for {
  133. select {
  134. case <-ctx.Done():
  135. return ctx.Err()
  136. case <-s.ctx.Done():
  137. return s.ctx.Err()
  138. case status := <-subscription:
  139. switch status.Status {
  140. case ServiceStatus_STARTED:
  141. return nil
  142. case ServiceStatus_FATAL:
  143. return E.New(status.ErrorMessage)
  144. case ServiceStatus_IDLE, ServiceStatus_STOPPING:
  145. return os.ErrInvalid
  146. }
  147. case <-done:
  148. return os.ErrClosed
  149. }
  150. }
  151. }
  152. func (s *StartedService) StartOrReloadService(profileContent string, options *OverrideOptions) error {
  153. s.serviceAccess.Lock()
  154. switch s.serviceStatus.Status {
  155. case ServiceStatus_IDLE, ServiceStatus_STARTED, ServiceStatus_STARTING, ServiceStatus_FATAL:
  156. default:
  157. s.serviceAccess.Unlock()
  158. return os.ErrInvalid
  159. }
  160. oldInstance := s.instance
  161. if oldInstance != nil {
  162. s.updateStatus(ServiceStatus_STOPPING)
  163. s.serviceAccess.Unlock()
  164. _ = oldInstance.Close()
  165. s.serviceAccess.Lock()
  166. }
  167. s.updateStatus(ServiceStatus_STARTING)
  168. s.resetLogs()
  169. instance, err := s.newInstance(profileContent, options)
  170. if err != nil {
  171. return s.updateStatusError(err)
  172. }
  173. s.instance = instance
  174. instance.urlTestHistoryStorage.SetHook(s.urlTestSubscriber)
  175. if instance.clashServer != nil {
  176. instance.clashServer.SetModeUpdateHook(s.clashModeSubscriber)
  177. instance.clashServer.(*clashapi.Server).TrafficManager().SetEventHook(s.connectionEventSubscriber)
  178. }
  179. s.serviceAccess.Unlock()
  180. err = instance.Start()
  181. s.serviceAccess.Lock()
  182. if s.serviceStatus.Status != ServiceStatus_STARTING {
  183. s.serviceAccess.Unlock()
  184. return nil
  185. }
  186. if err != nil {
  187. return s.updateStatusError(err)
  188. }
  189. s.startedAt = time.Now()
  190. s.updateStatus(ServiceStatus_STARTED)
  191. s.serviceAccess.Unlock()
  192. runtime.GC()
  193. return nil
  194. }
  195. func (s *StartedService) Close() {
  196. s.serviceStatusSubscriber.Close()
  197. s.logSubscriber.Close()
  198. s.urlTestSubscriber.Close()
  199. s.clashModeSubscriber.Close()
  200. s.connectionEventSubscriber.Close()
  201. }
  202. func (s *StartedService) CloseService() error {
  203. s.serviceAccess.Lock()
  204. switch s.serviceStatus.Status {
  205. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  206. default:
  207. s.serviceAccess.Unlock()
  208. return os.ErrInvalid
  209. }
  210. s.updateStatus(ServiceStatus_STOPPING)
  211. instance := s.instance
  212. s.instance = nil
  213. if instance != nil {
  214. err := instance.Close()
  215. if err != nil {
  216. return s.updateStatusError(err)
  217. }
  218. }
  219. s.startedAt = time.Time{}
  220. s.updateStatus(ServiceStatus_IDLE)
  221. s.serviceAccess.Unlock()
  222. runtime.GC()
  223. return nil
  224. }
  225. func (s *StartedService) SetError(err error) {
  226. s.serviceAccess.Lock()
  227. s.updateStatusError(err)
  228. s.WriteMessage(log.LevelError, err.Error())
  229. }
  230. func (s *StartedService) StopService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  231. err := s.handler.ServiceStop()
  232. if err != nil {
  233. return nil, err
  234. }
  235. return &emptypb.Empty{}, nil
  236. }
  237. func (s *StartedService) ReloadService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  238. err := s.handler.ServiceReload()
  239. if err != nil {
  240. return nil, err
  241. }
  242. return &emptypb.Empty{}, nil
  243. }
  244. func (s *StartedService) SubscribeServiceStatus(empty *emptypb.Empty, server grpc.ServerStreamingServer[ServiceStatus]) error {
  245. subscription, done, err := s.serviceStatusObserver.Subscribe()
  246. if err != nil {
  247. return err
  248. }
  249. defer s.serviceStatusObserver.UnSubscribe(subscription)
  250. err = server.Send(s.serviceStatus)
  251. if err != nil {
  252. return err
  253. }
  254. for {
  255. select {
  256. case <-s.ctx.Done():
  257. return s.ctx.Err()
  258. case <-server.Context().Done():
  259. return server.Context().Err()
  260. case newStatus := <-subscription:
  261. err = server.Send(newStatus)
  262. if err != nil {
  263. return err
  264. }
  265. case <-done:
  266. return nil
  267. }
  268. }
  269. }
  270. func (s *StartedService) SubscribeLog(empty *emptypb.Empty, server grpc.ServerStreamingServer[Log]) error {
  271. var savedLines []*log.Entry
  272. s.logAccess.Lock()
  273. savedLines = make([]*log.Entry, 0, s.logLines.Len())
  274. for element := s.logLines.Front(); element != nil; element = element.Next() {
  275. savedLines = append(savedLines, element.Value)
  276. }
  277. subscription, done, err := s.logObserver.Subscribe()
  278. s.logAccess.Unlock()
  279. if err != nil {
  280. return err
  281. }
  282. defer s.logObserver.UnSubscribe(subscription)
  283. err = server.Send(&Log{
  284. Messages: common.Map(savedLines, func(it *log.Entry) *Log_Message {
  285. return &Log_Message{
  286. Level: LogLevel(it.Level),
  287. Message: it.Message,
  288. }
  289. }),
  290. Reset_: true,
  291. })
  292. if err != nil {
  293. return err
  294. }
  295. for {
  296. select {
  297. case <-s.ctx.Done():
  298. return s.ctx.Err()
  299. case <-server.Context().Done():
  300. return server.Context().Err()
  301. case message := <-subscription:
  302. var rawMessage Log
  303. if message == nil {
  304. rawMessage.Reset_ = true
  305. } else {
  306. rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
  307. Level: LogLevel(message.Level),
  308. Message: message.Message,
  309. })
  310. }
  311. fetch:
  312. for {
  313. select {
  314. case message = <-subscription:
  315. if message == nil {
  316. rawMessage.Messages = nil
  317. rawMessage.Reset_ = true
  318. } else {
  319. rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
  320. Level: LogLevel(message.Level),
  321. Message: message.Message,
  322. })
  323. }
  324. default:
  325. break fetch
  326. }
  327. }
  328. err = server.Send(&rawMessage)
  329. if err != nil {
  330. return err
  331. }
  332. case <-done:
  333. return nil
  334. }
  335. }
  336. }
  337. func (s *StartedService) GetDefaultLogLevel(ctx context.Context, empty *emptypb.Empty) (*DefaultLogLevel, error) {
  338. s.serviceAccess.RLock()
  339. switch s.serviceStatus.Status {
  340. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  341. default:
  342. s.serviceAccess.RUnlock()
  343. return nil, os.ErrInvalid
  344. }
  345. logLevel := s.instance.instance.LogFactory().Level()
  346. s.serviceAccess.RUnlock()
  347. return &DefaultLogLevel{Level: LogLevel(logLevel)}, nil
  348. }
  349. func (s *StartedService) ClearLogs(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  350. s.resetLogs()
  351. return &emptypb.Empty{}, nil
  352. }
  353. func (s *StartedService) SubscribeStatus(request *SubscribeStatusRequest, server grpc.ServerStreamingServer[Status]) error {
  354. interval := time.Duration(request.Interval)
  355. if interval <= 0 {
  356. interval = time.Second // Default to 1 second
  357. }
  358. ticker := time.NewTicker(interval)
  359. defer ticker.Stop()
  360. status := s.readStatus()
  361. uploadTotal := status.UplinkTotal
  362. downloadTotal := status.DownlinkTotal
  363. for {
  364. err := server.Send(status)
  365. if err != nil {
  366. return err
  367. }
  368. select {
  369. case <-s.ctx.Done():
  370. return s.ctx.Err()
  371. case <-server.Context().Done():
  372. return server.Context().Err()
  373. case <-ticker.C:
  374. }
  375. status = s.readStatus()
  376. upload := status.UplinkTotal - uploadTotal
  377. download := status.DownlinkTotal - downloadTotal
  378. uploadTotal = status.UplinkTotal
  379. downloadTotal = status.DownlinkTotal
  380. status.Uplink = upload
  381. status.Downlink = download
  382. }
  383. }
  384. func (s *StartedService) readStatus() *Status {
  385. var status Status
  386. status.Memory = memory.Total()
  387. status.Goroutines = int32(runtime.NumGoroutine())
  388. s.serviceAccess.RLock()
  389. nowService := s.instance
  390. s.serviceAccess.RUnlock()
  391. if nowService != nil && nowService.connectionManager != nil {
  392. status.ConnectionsOut = int32(nowService.connectionManager.Count())
  393. }
  394. if nowService != nil {
  395. if clashServer := nowService.clashServer; clashServer != nil {
  396. status.TrafficAvailable = true
  397. trafficManager := clashServer.(*clashapi.Server).TrafficManager()
  398. status.UplinkTotal, status.DownlinkTotal = trafficManager.Total()
  399. status.ConnectionsIn = int32(trafficManager.ConnectionsLen())
  400. }
  401. }
  402. return &status
  403. }
  404. func (s *StartedService) SubscribeGroups(empty *emptypb.Empty, server grpc.ServerStreamingServer[Groups]) error {
  405. err := s.waitForStarted(server.Context())
  406. if err != nil {
  407. return err
  408. }
  409. subscription, done, err := s.urlTestObserver.Subscribe()
  410. if err != nil {
  411. return err
  412. }
  413. defer s.urlTestObserver.UnSubscribe(subscription)
  414. for {
  415. s.serviceAccess.RLock()
  416. if s.serviceStatus.Status != ServiceStatus_STARTED {
  417. s.serviceAccess.RUnlock()
  418. return os.ErrInvalid
  419. }
  420. groups := s.readGroups()
  421. s.serviceAccess.RUnlock()
  422. err = server.Send(groups)
  423. if err != nil {
  424. return err
  425. }
  426. select {
  427. case <-subscription:
  428. case <-s.ctx.Done():
  429. return s.ctx.Err()
  430. case <-server.Context().Done():
  431. return server.Context().Err()
  432. case <-done:
  433. return nil
  434. }
  435. }
  436. }
  437. func (s *StartedService) readGroups() *Groups {
  438. historyStorage := s.instance.urlTestHistoryStorage
  439. boxService := s.instance
  440. outbounds := boxService.instance.Outbound().Outbounds()
  441. var iGroups []adapter.OutboundGroup
  442. for _, it := range outbounds {
  443. if group, isGroup := it.(adapter.OutboundGroup); isGroup {
  444. iGroups = append(iGroups, group)
  445. }
  446. }
  447. var gs Groups
  448. for _, iGroup := range iGroups {
  449. var g Group
  450. g.Tag = iGroup.Tag()
  451. g.Type = iGroup.Type()
  452. _, g.Selectable = iGroup.(*group.Selector)
  453. g.Selected = iGroup.Now()
  454. if boxService.cacheFile != nil {
  455. if isExpand, loaded := boxService.cacheFile.LoadGroupExpand(g.Tag); loaded {
  456. g.IsExpand = isExpand
  457. }
  458. }
  459. for _, itemTag := range iGroup.All() {
  460. itemOutbound, isLoaded := boxService.instance.Outbound().Outbound(itemTag)
  461. if !isLoaded {
  462. continue
  463. }
  464. var item GroupItem
  465. item.Tag = itemTag
  466. item.Type = itemOutbound.Type()
  467. if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(itemOutbound)); history != nil {
  468. item.UrlTestTime = history.Time.Unix()
  469. item.UrlTestDelay = int32(history.Delay)
  470. }
  471. g.Items = append(g.Items, &item)
  472. }
  473. if len(g.Items) < 2 {
  474. continue
  475. }
  476. gs.Group = append(gs.Group, &g)
  477. }
  478. return &gs
  479. }
  480. func (s *StartedService) GetClashModeStatus(ctx context.Context, empty *emptypb.Empty) (*ClashModeStatus, error) {
  481. s.serviceAccess.RLock()
  482. if s.serviceStatus.Status != ServiceStatus_STARTED {
  483. s.serviceAccess.RUnlock()
  484. return nil, os.ErrInvalid
  485. }
  486. clashServer := s.instance.clashServer
  487. s.serviceAccess.RUnlock()
  488. if clashServer == nil {
  489. return nil, os.ErrInvalid
  490. }
  491. return &ClashModeStatus{
  492. ModeList: clashServer.ModeList(),
  493. CurrentMode: clashServer.Mode(),
  494. }, nil
  495. }
  496. func (s *StartedService) SubscribeClashMode(empty *emptypb.Empty, server grpc.ServerStreamingServer[ClashMode]) error {
  497. err := s.waitForStarted(server.Context())
  498. if err != nil {
  499. return err
  500. }
  501. subscription, done, err := s.clashModeObserver.Subscribe()
  502. if err != nil {
  503. return err
  504. }
  505. defer s.clashModeObserver.UnSubscribe(subscription)
  506. for {
  507. s.serviceAccess.RLock()
  508. if s.serviceStatus.Status != ServiceStatus_STARTED {
  509. s.serviceAccess.RUnlock()
  510. return os.ErrInvalid
  511. }
  512. message := &ClashMode{Mode: s.instance.clashServer.Mode()}
  513. s.serviceAccess.RUnlock()
  514. err = server.Send(message)
  515. if err != nil {
  516. return err
  517. }
  518. select {
  519. case <-subscription:
  520. case <-s.ctx.Done():
  521. return s.ctx.Err()
  522. case <-server.Context().Done():
  523. return server.Context().Err()
  524. case <-done:
  525. return nil
  526. }
  527. }
  528. }
  529. func (s *StartedService) SetClashMode(ctx context.Context, request *ClashMode) (*emptypb.Empty, error) {
  530. s.serviceAccess.RLock()
  531. if s.serviceStatus.Status != ServiceStatus_STARTED {
  532. s.serviceAccess.RUnlock()
  533. return nil, os.ErrInvalid
  534. }
  535. clashServer := s.instance.clashServer
  536. s.serviceAccess.RUnlock()
  537. clashServer.(*clashapi.Server).SetMode(request.Mode)
  538. return &emptypb.Empty{}, nil
  539. }
  540. func (s *StartedService) URLTest(ctx context.Context, request *URLTestRequest) (*emptypb.Empty, error) {
  541. s.serviceAccess.RLock()
  542. if s.serviceStatus.Status != ServiceStatus_STARTED {
  543. s.serviceAccess.RUnlock()
  544. return nil, os.ErrInvalid
  545. }
  546. boxService := s.instance
  547. s.serviceAccess.RUnlock()
  548. groupTag := request.OutboundTag
  549. abstractOutboundGroup, isLoaded := boxService.instance.Outbound().Outbound(groupTag)
  550. if !isLoaded {
  551. return nil, E.New("outbound group not found: ", groupTag)
  552. }
  553. outboundGroup, isOutboundGroup := abstractOutboundGroup.(adapter.OutboundGroup)
  554. if !isOutboundGroup {
  555. return nil, E.New("outbound is not a group: ", groupTag)
  556. }
  557. urlTest, isURLTest := abstractOutboundGroup.(*group.URLTest)
  558. if isURLTest {
  559. go urlTest.CheckOutbounds()
  560. } else {
  561. historyStorage := boxService.urlTestHistoryStorage
  562. outbounds := common.Filter(common.Map(outboundGroup.All(), func(it string) adapter.Outbound {
  563. itOutbound, _ := boxService.instance.Outbound().Outbound(it)
  564. return itOutbound
  565. }), func(it adapter.Outbound) bool {
  566. if it == nil {
  567. return false
  568. }
  569. _, isGroup := it.(adapter.OutboundGroup)
  570. if isGroup {
  571. return false
  572. }
  573. return true
  574. })
  575. b, _ := batch.New(boxService.ctx, batch.WithConcurrencyNum[any](10))
  576. for _, detour := range outbounds {
  577. outboundToTest := detour
  578. outboundTag := outboundToTest.Tag()
  579. b.Go(outboundTag, func() (any, error) {
  580. t, err := urltest.URLTest(boxService.ctx, "", outboundToTest)
  581. if err != nil {
  582. historyStorage.DeleteURLTestHistory(outboundTag)
  583. } else {
  584. historyStorage.StoreURLTestHistory(outboundTag, &adapter.URLTestHistory{
  585. Time: time.Now(),
  586. Delay: t,
  587. })
  588. }
  589. return nil, nil
  590. })
  591. }
  592. }
  593. return &emptypb.Empty{}, nil
  594. }
  595. func (s *StartedService) SelectOutbound(ctx context.Context, request *SelectOutboundRequest) (*emptypb.Empty, error) {
  596. s.serviceAccess.RLock()
  597. switch s.serviceStatus.Status {
  598. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  599. default:
  600. s.serviceAccess.RUnlock()
  601. return nil, os.ErrInvalid
  602. }
  603. boxService := s.instance.instance
  604. s.serviceAccess.RUnlock()
  605. outboundGroup, isLoaded := boxService.Outbound().Outbound(request.GroupTag)
  606. if !isLoaded {
  607. return nil, E.New("selector not found: ", request.GroupTag)
  608. }
  609. selector, isSelector := outboundGroup.(*group.Selector)
  610. if !isSelector {
  611. return nil, E.New("outbound is not a selector: ", request.GroupTag)
  612. }
  613. if !selector.SelectOutbound(request.OutboundTag) {
  614. return nil, E.New("outbound not found in selector: ", request.OutboundTag)
  615. }
  616. s.urlTestObserver.Emit(struct{}{})
  617. return &emptypb.Empty{}, nil
  618. }
  619. func (s *StartedService) SetGroupExpand(ctx context.Context, request *SetGroupExpandRequest) (*emptypb.Empty, error) {
  620. s.serviceAccess.RLock()
  621. switch s.serviceStatus.Status {
  622. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  623. default:
  624. s.serviceAccess.RUnlock()
  625. return nil, os.ErrInvalid
  626. }
  627. boxService := s.instance
  628. s.serviceAccess.RUnlock()
  629. if boxService.cacheFile != nil {
  630. err := boxService.cacheFile.StoreGroupExpand(request.GroupTag, request.IsExpand)
  631. if err != nil {
  632. return nil, err
  633. }
  634. }
  635. return &emptypb.Empty{}, nil
  636. }
  637. func (s *StartedService) GetSystemProxyStatus(ctx context.Context, empty *emptypb.Empty) (*SystemProxyStatus, error) {
  638. return s.handler.SystemProxyStatus()
  639. }
  640. func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *SetSystemProxyEnabledRequest) (*emptypb.Empty, error) {
  641. err := s.handler.SetSystemProxyEnabled(request.Enabled)
  642. if err != nil {
  643. return nil, err
  644. }
  645. return nil, err
  646. }
  647. func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[ConnectionEvents]) error {
  648. err := s.waitForStarted(server.Context())
  649. if err != nil {
  650. return err
  651. }
  652. s.serviceAccess.RLock()
  653. boxService := s.instance
  654. s.serviceAccess.RUnlock()
  655. if boxService.clashServer == nil {
  656. return E.New("clash server not available")
  657. }
  658. trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
  659. subscription, done, err := s.connectionEventObserver.Subscribe()
  660. if err != nil {
  661. return err
  662. }
  663. defer s.connectionEventObserver.UnSubscribe(subscription)
  664. connectionSnapshots := make(map[uuid.UUID]connectionSnapshot)
  665. initialEvents := s.buildInitialConnectionState(trafficManager, connectionSnapshots)
  666. err = server.Send(&ConnectionEvents{
  667. Events: initialEvents,
  668. Reset_: true,
  669. })
  670. if err != nil {
  671. return err
  672. }
  673. interval := time.Duration(request.Interval)
  674. if interval <= 0 {
  675. interval = time.Second
  676. }
  677. ticker := time.NewTicker(interval)
  678. defer ticker.Stop()
  679. for {
  680. select {
  681. case <-s.ctx.Done():
  682. return s.ctx.Err()
  683. case <-server.Context().Done():
  684. return server.Context().Err()
  685. case <-done:
  686. return nil
  687. case event := <-subscription:
  688. var pendingEvents []*ConnectionEvent
  689. if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
  690. pendingEvents = append(pendingEvents, protoEvent)
  691. }
  692. drain:
  693. for {
  694. select {
  695. case event = <-subscription:
  696. if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
  697. pendingEvents = append(pendingEvents, protoEvent)
  698. }
  699. default:
  700. break drain
  701. }
  702. }
  703. if len(pendingEvents) > 0 {
  704. err = server.Send(&ConnectionEvents{Events: pendingEvents})
  705. if err != nil {
  706. return err
  707. }
  708. }
  709. case <-ticker.C:
  710. protoEvents := s.buildTrafficUpdates(trafficManager, connectionSnapshots)
  711. if len(protoEvents) == 0 {
  712. continue
  713. }
  714. err = server.Send(&ConnectionEvents{Events: protoEvents})
  715. if err != nil {
  716. return err
  717. }
  718. }
  719. }
  720. }
  721. type connectionSnapshot struct {
  722. uplink int64
  723. downlink int64
  724. hadTraffic bool
  725. }
  726. func (s *StartedService) buildInitialConnectionState(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
  727. var events []*ConnectionEvent
  728. for _, metadata := range manager.Connections() {
  729. events = append(events, &ConnectionEvent{
  730. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  731. Id: metadata.ID.String(),
  732. Connection: buildConnectionProto(metadata),
  733. })
  734. snapshots[metadata.ID] = connectionSnapshot{
  735. uplink: metadata.Upload.Load(),
  736. downlink: metadata.Download.Load(),
  737. }
  738. }
  739. for _, metadata := range manager.ClosedConnections() {
  740. conn := buildConnectionProto(metadata)
  741. conn.ClosedAt = metadata.ClosedAt.UnixMilli()
  742. events = append(events, &ConnectionEvent{
  743. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  744. Id: metadata.ID.String(),
  745. Connection: conn,
  746. })
  747. }
  748. return events
  749. }
  750. func (s *StartedService) applyConnectionEvent(event trafficontrol.ConnectionEvent, snapshots map[uuid.UUID]connectionSnapshot) *ConnectionEvent {
  751. switch event.Type {
  752. case trafficontrol.ConnectionEventNew:
  753. if _, exists := snapshots[event.ID]; exists {
  754. return nil
  755. }
  756. snapshots[event.ID] = connectionSnapshot{
  757. uplink: event.Metadata.Upload.Load(),
  758. downlink: event.Metadata.Download.Load(),
  759. }
  760. return &ConnectionEvent{
  761. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  762. Id: event.ID.String(),
  763. Connection: buildConnectionProto(event.Metadata),
  764. }
  765. case trafficontrol.ConnectionEventClosed:
  766. delete(snapshots, event.ID)
  767. protoEvent := &ConnectionEvent{
  768. Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
  769. Id: event.ID.String(),
  770. }
  771. closedAt := event.ClosedAt
  772. if closedAt.IsZero() && !event.Metadata.ClosedAt.IsZero() {
  773. closedAt = event.Metadata.ClosedAt
  774. }
  775. if closedAt.IsZero() {
  776. closedAt = time.Now()
  777. }
  778. protoEvent.ClosedAt = closedAt.UnixMilli()
  779. if event.Metadata.ID != uuid.Nil {
  780. conn := buildConnectionProto(event.Metadata)
  781. conn.ClosedAt = protoEvent.ClosedAt
  782. protoEvent.Connection = conn
  783. }
  784. return protoEvent
  785. default:
  786. return nil
  787. }
  788. }
  789. func (s *StartedService) buildTrafficUpdates(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
  790. activeConnections := manager.Connections()
  791. activeIndex := make(map[uuid.UUID]*trafficontrol.TrackerMetadata, len(activeConnections))
  792. var events []*ConnectionEvent
  793. for _, metadata := range activeConnections {
  794. activeIndex[metadata.ID] = metadata
  795. currentUpload := metadata.Upload.Load()
  796. currentDownload := metadata.Download.Load()
  797. snapshot, exists := snapshots[metadata.ID]
  798. if !exists {
  799. snapshots[metadata.ID] = connectionSnapshot{
  800. uplink: currentUpload,
  801. downlink: currentDownload,
  802. }
  803. events = append(events, &ConnectionEvent{
  804. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  805. Id: metadata.ID.String(),
  806. Connection: buildConnectionProto(metadata),
  807. })
  808. continue
  809. }
  810. uplinkDelta := currentUpload - snapshot.uplink
  811. downlinkDelta := currentDownload - snapshot.downlink
  812. if uplinkDelta < 0 || downlinkDelta < 0 {
  813. if snapshot.hadTraffic {
  814. events = append(events, &ConnectionEvent{
  815. Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
  816. Id: metadata.ID.String(),
  817. UplinkDelta: 0,
  818. DownlinkDelta: 0,
  819. })
  820. }
  821. snapshot.uplink = currentUpload
  822. snapshot.downlink = currentDownload
  823. snapshot.hadTraffic = false
  824. snapshots[metadata.ID] = snapshot
  825. continue
  826. }
  827. if uplinkDelta > 0 || downlinkDelta > 0 {
  828. snapshot.uplink = currentUpload
  829. snapshot.downlink = currentDownload
  830. snapshot.hadTraffic = true
  831. snapshots[metadata.ID] = snapshot
  832. events = append(events, &ConnectionEvent{
  833. Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
  834. Id: metadata.ID.String(),
  835. UplinkDelta: uplinkDelta,
  836. DownlinkDelta: downlinkDelta,
  837. })
  838. continue
  839. }
  840. if snapshot.hadTraffic {
  841. snapshot.uplink = currentUpload
  842. snapshot.downlink = currentDownload
  843. snapshot.hadTraffic = false
  844. snapshots[metadata.ID] = snapshot
  845. events = append(events, &ConnectionEvent{
  846. Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
  847. Id: metadata.ID.String(),
  848. UplinkDelta: 0,
  849. DownlinkDelta: 0,
  850. })
  851. }
  852. }
  853. var closedIndex map[uuid.UUID]*trafficontrol.TrackerMetadata
  854. for id := range snapshots {
  855. if _, exists := activeIndex[id]; exists {
  856. continue
  857. }
  858. if closedIndex == nil {
  859. closedIndex = make(map[uuid.UUID]*trafficontrol.TrackerMetadata)
  860. for _, metadata := range manager.ClosedConnections() {
  861. closedIndex[metadata.ID] = metadata
  862. }
  863. }
  864. closedAt := time.Now()
  865. var conn *Connection
  866. if metadata, ok := closedIndex[id]; ok {
  867. if !metadata.ClosedAt.IsZero() {
  868. closedAt = metadata.ClosedAt
  869. }
  870. conn = buildConnectionProto(metadata)
  871. conn.ClosedAt = closedAt.UnixMilli()
  872. }
  873. events = append(events, &ConnectionEvent{
  874. Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
  875. Id: id.String(),
  876. ClosedAt: closedAt.UnixMilli(),
  877. Connection: conn,
  878. })
  879. delete(snapshots, id)
  880. }
  881. return events
  882. }
  883. func buildConnectionProto(metadata *trafficontrol.TrackerMetadata) *Connection {
  884. var rule string
  885. if metadata.Rule != nil {
  886. rule = metadata.Rule.String()
  887. }
  888. uplinkTotal := metadata.Upload.Load()
  889. downlinkTotal := metadata.Download.Load()
  890. var processInfo *ProcessInfo
  891. if metadata.Metadata.ProcessInfo != nil {
  892. processInfo = &ProcessInfo{
  893. ProcessId: metadata.Metadata.ProcessInfo.ProcessID,
  894. UserId: metadata.Metadata.ProcessInfo.UserId,
  895. UserName: metadata.Metadata.ProcessInfo.UserName,
  896. ProcessPath: metadata.Metadata.ProcessInfo.ProcessPath,
  897. PackageNames: metadata.Metadata.ProcessInfo.AndroidPackageNames,
  898. }
  899. }
  900. return &Connection{
  901. Id: metadata.ID.String(),
  902. Inbound: metadata.Metadata.Inbound,
  903. InboundType: metadata.Metadata.InboundType,
  904. IpVersion: int32(metadata.Metadata.IPVersion),
  905. Network: metadata.Metadata.Network,
  906. Source: metadata.Metadata.Source.String(),
  907. Destination: metadata.Metadata.Destination.String(),
  908. Domain: metadata.Metadata.Domain,
  909. Protocol: metadata.Metadata.Protocol,
  910. User: metadata.Metadata.User,
  911. FromOutbound: metadata.Metadata.Outbound,
  912. CreatedAt: metadata.CreatedAt.UnixMilli(),
  913. UplinkTotal: uplinkTotal,
  914. DownlinkTotal: downlinkTotal,
  915. Rule: rule,
  916. Outbound: metadata.Outbound,
  917. OutboundType: metadata.OutboundType,
  918. ChainList: metadata.Chain,
  919. ProcessInfo: processInfo,
  920. }
  921. }
  922. func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {
  923. s.serviceAccess.RLock()
  924. switch s.serviceStatus.Status {
  925. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  926. default:
  927. s.serviceAccess.RUnlock()
  928. return nil, os.ErrInvalid
  929. }
  930. boxService := s.instance
  931. s.serviceAccess.RUnlock()
  932. targetConn := boxService.clashServer.(*clashapi.Server).TrafficManager().Connection(uuid.FromStringOrNil(request.Id))
  933. if targetConn != nil {
  934. targetConn.Close()
  935. }
  936. return &emptypb.Empty{}, nil
  937. }
  938. func (s *StartedService) CloseAllConnections(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  939. s.serviceAccess.RLock()
  940. nowService := s.instance
  941. s.serviceAccess.RUnlock()
  942. if nowService != nil && nowService.connectionManager != nil {
  943. nowService.connectionManager.CloseAll()
  944. }
  945. return &emptypb.Empty{}, nil
  946. }
  947. func (s *StartedService) GetDeprecatedWarnings(ctx context.Context, empty *emptypb.Empty) (*DeprecatedWarnings, error) {
  948. s.serviceAccess.RLock()
  949. if s.serviceStatus.Status != ServiceStatus_STARTED {
  950. s.serviceAccess.RUnlock()
  951. return nil, os.ErrInvalid
  952. }
  953. boxService := s.instance
  954. s.serviceAccess.RUnlock()
  955. notes := service.FromContext[deprecated.Manager](boxService.ctx).(*deprecatedManager).Get()
  956. return &DeprecatedWarnings{
  957. Warnings: common.Map(notes, func(it deprecated.Note) *DeprecatedWarning {
  958. return &DeprecatedWarning{
  959. Message: it.Message(),
  960. Impending: it.Impending(),
  961. MigrationLink: it.MigrationLink,
  962. }
  963. }),
  964. }, nil
  965. }
  966. func (s *StartedService) GetStartedAt(ctx context.Context, empty *emptypb.Empty) (*StartedAt, error) {
  967. s.serviceAccess.RLock()
  968. defer s.serviceAccess.RUnlock()
  969. return &StartedAt{StartedAt: s.startedAt.UnixMilli()}, nil
  970. }
  971. func (s *StartedService) mustEmbedUnimplementedStartedServiceServer() {
  972. }
  973. func (s *StartedService) WriteMessage(level log.Level, message string) {
  974. item := &log.Entry{Level: level, Message: message}
  975. s.logAccess.Lock()
  976. s.logLines.PushBack(item)
  977. if s.logLines.Len() > s.logMaxLines {
  978. s.logLines.Remove(s.logLines.Front())
  979. }
  980. s.logAccess.Unlock()
  981. s.logSubscriber.Emit(item)
  982. if s.debug {
  983. s.handler.WriteDebugMessage(message)
  984. }
  985. }
  986. func (s *StartedService) Instance() *Instance {
  987. s.serviceAccess.RLock()
  988. defer s.serviceAccess.RUnlock()
  989. return s.instance
  990. }