started_service.go 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489
  1. package daemon
  2. import (
  3. "context"
  4. "os"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "unsafe"
  9. "github.com/sagernet/sing-box/adapter"
  10. "github.com/sagernet/sing-box/common/dialer"
  11. "github.com/sagernet/sing-box/common/networkquality"
  12. "github.com/sagernet/sing-box/common/stun"
  13. "github.com/sagernet/sing-box/common/urltest"
  14. C "github.com/sagernet/sing-box/constant"
  15. "github.com/sagernet/sing-box/experimental/clashapi"
  16. "github.com/sagernet/sing-box/experimental/clashapi/trafficontrol"
  17. "github.com/sagernet/sing-box/experimental/deprecated"
  18. "github.com/sagernet/sing-box/log"
  19. "github.com/sagernet/sing-box/protocol/group"
  20. "github.com/sagernet/sing-box/service/oomkiller"
  21. "github.com/sagernet/sing/common"
  22. "github.com/sagernet/sing/common/batch"
  23. E "github.com/sagernet/sing/common/exceptions"
  24. "github.com/sagernet/sing/common/memory"
  25. "github.com/sagernet/sing/common/observable"
  26. "github.com/sagernet/sing/common/x/list"
  27. "github.com/sagernet/sing/service"
  28. "github.com/gofrs/uuid/v5"
  29. "google.golang.org/grpc"
  30. "google.golang.org/grpc/codes"
  31. "google.golang.org/grpc/status"
  32. "google.golang.org/protobuf/types/known/emptypb"
  33. )
  34. var _ StartedServiceServer = (*StartedService)(nil)
  35. type StartedService struct {
  36. ctx context.Context
  37. // platform adapter.PlatformInterface
  38. handler PlatformHandler
  39. debug bool
  40. logMaxLines int
  41. oomKillerEnabled bool
  42. oomKillerDisabled bool
  43. oomMemoryLimit uint64
  44. // workingDirectory string
  45. // tempDirectory string
  46. // userID int
  47. // groupID int
  48. // systemProxyEnabled bool
  49. serviceAccess sync.RWMutex
  50. serviceStatus *ServiceStatus
  51. serviceStatusSubscriber *observable.Subscriber[*ServiceStatus]
  52. serviceStatusObserver *observable.Observer[*ServiceStatus]
  53. logAccess sync.RWMutex
  54. logLines list.List[*log.Entry]
  55. logSubscriber *observable.Subscriber[*log.Entry]
  56. logObserver *observable.Observer[*log.Entry]
  57. instance *Instance
  58. startedAt time.Time
  59. urlTestSubscriber *observable.Subscriber[struct{}]
  60. urlTestObserver *observable.Observer[struct{}]
  61. urlTestHistoryStorage *urltest.HistoryStorage
  62. clashModeSubscriber *observable.Subscriber[struct{}]
  63. clashModeObserver *observable.Observer[struct{}]
  64. connectionEventSubscriber *observable.Subscriber[trafficontrol.ConnectionEvent]
  65. connectionEventObserver *observable.Observer[trafficontrol.ConnectionEvent]
  66. }
  67. type ServiceOptions struct {
  68. Context context.Context
  69. // Platform adapter.PlatformInterface
  70. Handler PlatformHandler
  71. Debug bool
  72. LogMaxLines int
  73. OOMKillerEnabled bool
  74. OOMKillerDisabled bool
  75. OOMMemoryLimit uint64
  76. // WorkingDirectory string
  77. // TempDirectory string
  78. // UserID int
  79. // GroupID int
  80. // SystemProxyEnabled bool
  81. }
  82. func NewStartedService(options ServiceOptions) *StartedService {
  83. s := &StartedService{
  84. ctx: options.Context,
  85. // platform: options.Platform,
  86. handler: options.Handler,
  87. debug: options.Debug,
  88. logMaxLines: options.LogMaxLines,
  89. oomKillerEnabled: options.OOMKillerEnabled,
  90. oomKillerDisabled: options.OOMKillerDisabled,
  91. oomMemoryLimit: options.OOMMemoryLimit,
  92. // workingDirectory: options.WorkingDirectory,
  93. // tempDirectory: options.TempDirectory,
  94. // userID: options.UserID,
  95. // groupID: options.GroupID,
  96. // systemProxyEnabled: options.SystemProxyEnabled,
  97. serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
  98. serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
  99. logSubscriber: observable.NewSubscriber[*log.Entry](128),
  100. urlTestSubscriber: observable.NewSubscriber[struct{}](1),
  101. urlTestHistoryStorage: urltest.NewHistoryStorage(),
  102. clashModeSubscriber: observable.NewSubscriber[struct{}](1),
  103. connectionEventSubscriber: observable.NewSubscriber[trafficontrol.ConnectionEvent](256),
  104. }
  105. s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
  106. s.logObserver = observable.NewObserver(s.logSubscriber, 64)
  107. s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
  108. s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
  109. s.connectionEventObserver = observable.NewObserver(s.connectionEventSubscriber, 64)
  110. return s
  111. }
  112. func (s *StartedService) resetLogs() {
  113. s.logAccess.Lock()
  114. s.logLines = list.List[*log.Entry]{}
  115. s.logAccess.Unlock()
  116. s.logSubscriber.Emit(nil)
  117. }
  118. func (s *StartedService) updateStatus(newStatus ServiceStatus_Type) {
  119. statusObject := &ServiceStatus{Status: newStatus}
  120. s.serviceStatusSubscriber.Emit(statusObject)
  121. s.serviceStatus = statusObject
  122. }
  123. func (s *StartedService) updateStatusError(err error) error {
  124. statusObject := &ServiceStatus{Status: ServiceStatus_FATAL, ErrorMessage: err.Error()}
  125. s.serviceStatusSubscriber.Emit(statusObject)
  126. s.serviceStatus = statusObject
  127. s.serviceAccess.Unlock()
  128. return err
  129. }
  130. func (s *StartedService) waitForStarted(ctx context.Context) error {
  131. s.serviceAccess.RLock()
  132. currentStatus := s.serviceStatus.Status
  133. s.serviceAccess.RUnlock()
  134. switch currentStatus {
  135. case ServiceStatus_STARTED:
  136. return nil
  137. case ServiceStatus_STARTING:
  138. default:
  139. return os.ErrInvalid
  140. }
  141. subscription, done, err := s.serviceStatusObserver.Subscribe()
  142. if err != nil {
  143. return err
  144. }
  145. defer s.serviceStatusObserver.UnSubscribe(subscription)
  146. for {
  147. select {
  148. case <-ctx.Done():
  149. return ctx.Err()
  150. case <-s.ctx.Done():
  151. return s.ctx.Err()
  152. case status := <-subscription:
  153. switch status.Status {
  154. case ServiceStatus_STARTED:
  155. return nil
  156. case ServiceStatus_FATAL:
  157. return E.New(status.ErrorMessage)
  158. case ServiceStatus_IDLE, ServiceStatus_STOPPING:
  159. return os.ErrInvalid
  160. }
  161. case <-done:
  162. return os.ErrClosed
  163. }
  164. }
  165. }
  166. func (s *StartedService) StartOrReloadService(profileContent string, options *OverrideOptions) error {
  167. s.serviceAccess.Lock()
  168. switch s.serviceStatus.Status {
  169. case ServiceStatus_IDLE, ServiceStatus_STARTED, ServiceStatus_STARTING, ServiceStatus_FATAL:
  170. default:
  171. s.serviceAccess.Unlock()
  172. return os.ErrInvalid
  173. }
  174. oldInstance := s.instance
  175. if oldInstance != nil {
  176. s.updateStatus(ServiceStatus_STOPPING)
  177. s.serviceAccess.Unlock()
  178. _ = oldInstance.Close()
  179. s.serviceAccess.Lock()
  180. }
  181. s.updateStatus(ServiceStatus_STARTING)
  182. s.resetLogs()
  183. instance, err := s.newInstance(profileContent, options)
  184. if err != nil {
  185. return s.updateStatusError(err)
  186. }
  187. s.instance = instance
  188. instance.urlTestHistoryStorage.SetHook(s.urlTestSubscriber)
  189. if instance.clashServer != nil {
  190. instance.clashServer.SetModeUpdateHook(s.clashModeSubscriber)
  191. instance.clashServer.(*clashapi.Server).TrafficManager().SetEventHook(s.connectionEventSubscriber)
  192. }
  193. s.serviceAccess.Unlock()
  194. err = instance.Start()
  195. s.serviceAccess.Lock()
  196. if s.serviceStatus.Status != ServiceStatus_STARTING {
  197. s.serviceAccess.Unlock()
  198. return nil
  199. }
  200. if err != nil {
  201. return s.updateStatusError(err)
  202. }
  203. s.startedAt = time.Now()
  204. s.updateStatus(ServiceStatus_STARTED)
  205. s.serviceAccess.Unlock()
  206. runtime.GC()
  207. return nil
  208. }
  209. func (s *StartedService) Close() {
  210. s.serviceStatusSubscriber.Close()
  211. s.logSubscriber.Close()
  212. s.urlTestSubscriber.Close()
  213. s.clashModeSubscriber.Close()
  214. s.connectionEventSubscriber.Close()
  215. }
  216. func (s *StartedService) CloseService() error {
  217. s.serviceAccess.Lock()
  218. switch s.serviceStatus.Status {
  219. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  220. default:
  221. s.serviceAccess.Unlock()
  222. return os.ErrInvalid
  223. }
  224. s.updateStatus(ServiceStatus_STOPPING)
  225. instance := s.instance
  226. s.instance = nil
  227. if instance != nil {
  228. err := instance.Close()
  229. if err != nil {
  230. return s.updateStatusError(err)
  231. }
  232. }
  233. s.startedAt = time.Time{}
  234. s.updateStatus(ServiceStatus_IDLE)
  235. s.serviceAccess.Unlock()
  236. runtime.GC()
  237. return nil
  238. }
  239. func (s *StartedService) SetError(err error) {
  240. s.serviceAccess.Lock()
  241. s.updateStatusError(err)
  242. s.WriteMessage(log.LevelError, err.Error())
  243. }
  244. func (s *StartedService) StopService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  245. err := s.handler.ServiceStop()
  246. if err != nil {
  247. return nil, err
  248. }
  249. return &emptypb.Empty{}, nil
  250. }
  251. func (s *StartedService) ReloadService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  252. err := s.handler.ServiceReload()
  253. if err != nil {
  254. return nil, err
  255. }
  256. return &emptypb.Empty{}, nil
  257. }
  258. func (s *StartedService) SubscribeServiceStatus(empty *emptypb.Empty, server grpc.ServerStreamingServer[ServiceStatus]) error {
  259. subscription, done, err := s.serviceStatusObserver.Subscribe()
  260. if err != nil {
  261. return err
  262. }
  263. defer s.serviceStatusObserver.UnSubscribe(subscription)
  264. err = server.Send(s.serviceStatus)
  265. if err != nil {
  266. return err
  267. }
  268. for {
  269. select {
  270. case <-s.ctx.Done():
  271. return s.ctx.Err()
  272. case <-server.Context().Done():
  273. return server.Context().Err()
  274. case newStatus := <-subscription:
  275. err = server.Send(newStatus)
  276. if err != nil {
  277. return err
  278. }
  279. case <-done:
  280. return nil
  281. }
  282. }
  283. }
  284. func (s *StartedService) SubscribeLog(empty *emptypb.Empty, server grpc.ServerStreamingServer[Log]) error {
  285. var savedLines []*log.Entry
  286. s.logAccess.Lock()
  287. savedLines = make([]*log.Entry, 0, s.logLines.Len())
  288. for element := s.logLines.Front(); element != nil; element = element.Next() {
  289. savedLines = append(savedLines, element.Value)
  290. }
  291. subscription, done, err := s.logObserver.Subscribe()
  292. s.logAccess.Unlock()
  293. if err != nil {
  294. return err
  295. }
  296. defer s.logObserver.UnSubscribe(subscription)
  297. err = server.Send(&Log{
  298. Messages: common.Map(savedLines, func(it *log.Entry) *Log_Message {
  299. return &Log_Message{
  300. Level: LogLevel(it.Level),
  301. Message: it.Message,
  302. }
  303. }),
  304. Reset_: true,
  305. })
  306. if err != nil {
  307. return err
  308. }
  309. for {
  310. select {
  311. case <-s.ctx.Done():
  312. return s.ctx.Err()
  313. case <-server.Context().Done():
  314. return server.Context().Err()
  315. case message := <-subscription:
  316. var rawMessage Log
  317. if message == nil {
  318. rawMessage.Reset_ = true
  319. } else {
  320. rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
  321. Level: LogLevel(message.Level),
  322. Message: message.Message,
  323. })
  324. }
  325. fetch:
  326. for {
  327. select {
  328. case message = <-subscription:
  329. if message == nil {
  330. rawMessage.Messages = nil
  331. rawMessage.Reset_ = true
  332. } else {
  333. rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
  334. Level: LogLevel(message.Level),
  335. Message: message.Message,
  336. })
  337. }
  338. default:
  339. break fetch
  340. }
  341. }
  342. err = server.Send(&rawMessage)
  343. if err != nil {
  344. return err
  345. }
  346. case <-done:
  347. return nil
  348. }
  349. }
  350. }
  351. func (s *StartedService) GetDefaultLogLevel(ctx context.Context, empty *emptypb.Empty) (*DefaultLogLevel, error) {
  352. s.serviceAccess.RLock()
  353. switch s.serviceStatus.Status {
  354. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  355. default:
  356. s.serviceAccess.RUnlock()
  357. return nil, os.ErrInvalid
  358. }
  359. logLevel := s.instance.instance.LogFactory().Level()
  360. s.serviceAccess.RUnlock()
  361. return &DefaultLogLevel{Level: LogLevel(logLevel)}, nil
  362. }
  363. func (s *StartedService) ClearLogs(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  364. s.resetLogs()
  365. return &emptypb.Empty{}, nil
  366. }
  367. func (s *StartedService) SubscribeStatus(request *SubscribeStatusRequest, server grpc.ServerStreamingServer[Status]) error {
  368. interval := time.Duration(request.Interval)
  369. if interval <= 0 {
  370. interval = time.Second // Default to 1 second
  371. }
  372. ticker := time.NewTicker(interval)
  373. defer ticker.Stop()
  374. status := s.readStatus()
  375. uploadTotal := status.UplinkTotal
  376. downloadTotal := status.DownlinkTotal
  377. for {
  378. err := server.Send(status)
  379. if err != nil {
  380. return err
  381. }
  382. select {
  383. case <-s.ctx.Done():
  384. return s.ctx.Err()
  385. case <-server.Context().Done():
  386. return server.Context().Err()
  387. case <-ticker.C:
  388. }
  389. status = s.readStatus()
  390. upload := status.UplinkTotal - uploadTotal
  391. download := status.DownlinkTotal - downloadTotal
  392. uploadTotal = status.UplinkTotal
  393. downloadTotal = status.DownlinkTotal
  394. status.Uplink = upload
  395. status.Downlink = download
  396. }
  397. }
  398. func (s *StartedService) readStatus() *Status {
  399. var status Status
  400. status.Memory = memory.Total()
  401. status.Goroutines = int32(runtime.NumGoroutine())
  402. s.serviceAccess.RLock()
  403. nowService := s.instance
  404. s.serviceAccess.RUnlock()
  405. if nowService != nil && nowService.connectionManager != nil {
  406. status.ConnectionsOut = int32(nowService.connectionManager.Count())
  407. }
  408. if nowService != nil {
  409. if clashServer := nowService.clashServer; clashServer != nil {
  410. status.TrafficAvailable = true
  411. trafficManager := clashServer.(*clashapi.Server).TrafficManager()
  412. status.UplinkTotal, status.DownlinkTotal = trafficManager.Total()
  413. status.ConnectionsIn = int32(trafficManager.ConnectionsLen())
  414. }
  415. }
  416. return &status
  417. }
  418. func (s *StartedService) SubscribeGroups(empty *emptypb.Empty, server grpc.ServerStreamingServer[Groups]) error {
  419. err := s.waitForStarted(server.Context())
  420. if err != nil {
  421. return err
  422. }
  423. subscription, done, err := s.urlTestObserver.Subscribe()
  424. if err != nil {
  425. return err
  426. }
  427. defer s.urlTestObserver.UnSubscribe(subscription)
  428. for {
  429. s.serviceAccess.RLock()
  430. if s.serviceStatus.Status != ServiceStatus_STARTED {
  431. s.serviceAccess.RUnlock()
  432. return os.ErrInvalid
  433. }
  434. groups := s.readGroups()
  435. s.serviceAccess.RUnlock()
  436. err = server.Send(groups)
  437. if err != nil {
  438. return err
  439. }
  440. select {
  441. case <-subscription:
  442. case <-s.ctx.Done():
  443. return s.ctx.Err()
  444. case <-server.Context().Done():
  445. return server.Context().Err()
  446. case <-done:
  447. return nil
  448. }
  449. }
  450. }
  451. func (s *StartedService) readGroups() *Groups {
  452. historyStorage := s.instance.urlTestHistoryStorage
  453. boxService := s.instance
  454. outbounds := boxService.instance.Outbound().Outbounds()
  455. var iGroups []adapter.OutboundGroup
  456. for _, it := range outbounds {
  457. if group, isGroup := it.(adapter.OutboundGroup); isGroup {
  458. iGroups = append(iGroups, group)
  459. }
  460. }
  461. var gs Groups
  462. for _, iGroup := range iGroups {
  463. var g Group
  464. g.Tag = iGroup.Tag()
  465. g.Type = iGroup.Type()
  466. _, g.Selectable = iGroup.(*group.Selector)
  467. g.Selected = iGroup.Now()
  468. if boxService.cacheFile != nil {
  469. if isExpand, loaded := boxService.cacheFile.LoadGroupExpand(g.Tag); loaded {
  470. g.IsExpand = isExpand
  471. }
  472. }
  473. for _, itemTag := range iGroup.All() {
  474. itemOutbound, isLoaded := boxService.instance.Outbound().Outbound(itemTag)
  475. if !isLoaded {
  476. continue
  477. }
  478. var item GroupItem
  479. item.Tag = itemTag
  480. item.Type = itemOutbound.Type()
  481. if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(itemOutbound)); history != nil {
  482. item.UrlTestTime = history.Time.Unix()
  483. item.UrlTestDelay = int32(history.Delay)
  484. }
  485. g.Items = append(g.Items, &item)
  486. }
  487. if len(g.Items) < 2 {
  488. continue
  489. }
  490. gs.Group = append(gs.Group, &g)
  491. }
  492. return &gs
  493. }
  494. func (s *StartedService) GetClashModeStatus(ctx context.Context, empty *emptypb.Empty) (*ClashModeStatus, error) {
  495. s.serviceAccess.RLock()
  496. if s.serviceStatus.Status != ServiceStatus_STARTED {
  497. s.serviceAccess.RUnlock()
  498. return nil, os.ErrInvalid
  499. }
  500. clashServer := s.instance.clashServer
  501. s.serviceAccess.RUnlock()
  502. if clashServer == nil {
  503. return nil, os.ErrInvalid
  504. }
  505. return &ClashModeStatus{
  506. ModeList: clashServer.ModeList(),
  507. CurrentMode: clashServer.Mode(),
  508. }, nil
  509. }
  510. func (s *StartedService) SubscribeClashMode(empty *emptypb.Empty, server grpc.ServerStreamingServer[ClashMode]) error {
  511. err := s.waitForStarted(server.Context())
  512. if err != nil {
  513. return err
  514. }
  515. subscription, done, err := s.clashModeObserver.Subscribe()
  516. if err != nil {
  517. return err
  518. }
  519. defer s.clashModeObserver.UnSubscribe(subscription)
  520. for {
  521. s.serviceAccess.RLock()
  522. if s.serviceStatus.Status != ServiceStatus_STARTED {
  523. s.serviceAccess.RUnlock()
  524. return os.ErrInvalid
  525. }
  526. message := &ClashMode{Mode: s.instance.clashServer.Mode()}
  527. s.serviceAccess.RUnlock()
  528. err = server.Send(message)
  529. if err != nil {
  530. return err
  531. }
  532. select {
  533. case <-subscription:
  534. case <-s.ctx.Done():
  535. return s.ctx.Err()
  536. case <-server.Context().Done():
  537. return server.Context().Err()
  538. case <-done:
  539. return nil
  540. }
  541. }
  542. }
  543. func (s *StartedService) SetClashMode(ctx context.Context, request *ClashMode) (*emptypb.Empty, error) {
  544. s.serviceAccess.RLock()
  545. if s.serviceStatus.Status != ServiceStatus_STARTED {
  546. s.serviceAccess.RUnlock()
  547. return nil, os.ErrInvalid
  548. }
  549. clashServer := s.instance.clashServer
  550. s.serviceAccess.RUnlock()
  551. clashServer.(*clashapi.Server).SetMode(request.Mode)
  552. return &emptypb.Empty{}, nil
  553. }
  554. func (s *StartedService) URLTest(ctx context.Context, request *URLTestRequest) (*emptypb.Empty, error) {
  555. s.serviceAccess.RLock()
  556. if s.serviceStatus.Status != ServiceStatus_STARTED {
  557. s.serviceAccess.RUnlock()
  558. return nil, os.ErrInvalid
  559. }
  560. boxService := s.instance
  561. s.serviceAccess.RUnlock()
  562. groupTag := request.OutboundTag
  563. abstractOutboundGroup, isLoaded := boxService.instance.Outbound().Outbound(groupTag)
  564. if !isLoaded {
  565. return nil, E.New("outbound group not found: ", groupTag)
  566. }
  567. outboundGroup, isOutboundGroup := abstractOutboundGroup.(adapter.OutboundGroup)
  568. if !isOutboundGroup {
  569. return nil, E.New("outbound is not a group: ", groupTag)
  570. }
  571. urlTest, isURLTest := abstractOutboundGroup.(*group.URLTest)
  572. if isURLTest {
  573. go urlTest.CheckOutbounds()
  574. } else {
  575. historyStorage := boxService.urlTestHistoryStorage
  576. outbounds := common.Filter(common.Map(outboundGroup.All(), func(it string) adapter.Outbound {
  577. itOutbound, _ := boxService.instance.Outbound().Outbound(it)
  578. return itOutbound
  579. }), func(it adapter.Outbound) bool {
  580. if it == nil {
  581. return false
  582. }
  583. _, isGroup := it.(adapter.OutboundGroup)
  584. if isGroup {
  585. return false
  586. }
  587. return true
  588. })
  589. b, _ := batch.New(boxService.ctx, batch.WithConcurrencyNum[any](10))
  590. for _, detour := range outbounds {
  591. outboundToTest := detour
  592. outboundTag := outboundToTest.Tag()
  593. b.Go(outboundTag, func() (any, error) {
  594. t, err := urltest.URLTest(boxService.ctx, "", outboundToTest)
  595. if err != nil {
  596. historyStorage.DeleteURLTestHistory(outboundTag)
  597. } else {
  598. historyStorage.StoreURLTestHistory(outboundTag, &adapter.URLTestHistory{
  599. Time: time.Now(),
  600. Delay: t,
  601. })
  602. }
  603. return nil, nil
  604. })
  605. }
  606. }
  607. return &emptypb.Empty{}, nil
  608. }
  609. func (s *StartedService) SelectOutbound(ctx context.Context, request *SelectOutboundRequest) (*emptypb.Empty, error) {
  610. s.serviceAccess.RLock()
  611. switch s.serviceStatus.Status {
  612. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  613. default:
  614. s.serviceAccess.RUnlock()
  615. return nil, os.ErrInvalid
  616. }
  617. boxService := s.instance.instance
  618. s.serviceAccess.RUnlock()
  619. outboundGroup, isLoaded := boxService.Outbound().Outbound(request.GroupTag)
  620. if !isLoaded {
  621. return nil, E.New("selector not found: ", request.GroupTag)
  622. }
  623. selector, isSelector := outboundGroup.(*group.Selector)
  624. if !isSelector {
  625. return nil, E.New("outbound is not a selector: ", request.GroupTag)
  626. }
  627. if !selector.SelectOutbound(request.OutboundTag) {
  628. return nil, E.New("outbound not found in selector: ", request.OutboundTag)
  629. }
  630. s.urlTestObserver.Emit(struct{}{})
  631. return &emptypb.Empty{}, nil
  632. }
  633. func (s *StartedService) SetGroupExpand(ctx context.Context, request *SetGroupExpandRequest) (*emptypb.Empty, error) {
  634. s.serviceAccess.RLock()
  635. switch s.serviceStatus.Status {
  636. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  637. default:
  638. s.serviceAccess.RUnlock()
  639. return nil, os.ErrInvalid
  640. }
  641. boxService := s.instance
  642. s.serviceAccess.RUnlock()
  643. if boxService.cacheFile != nil {
  644. err := boxService.cacheFile.StoreGroupExpand(request.GroupTag, request.IsExpand)
  645. if err != nil {
  646. return nil, err
  647. }
  648. }
  649. return &emptypb.Empty{}, nil
  650. }
  651. func (s *StartedService) GetSystemProxyStatus(ctx context.Context, empty *emptypb.Empty) (*SystemProxyStatus, error) {
  652. return s.handler.SystemProxyStatus()
  653. }
  654. func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *SetSystemProxyEnabledRequest) (*emptypb.Empty, error) {
  655. err := s.handler.SetSystemProxyEnabled(request.Enabled)
  656. if err != nil {
  657. return nil, err
  658. }
  659. return &emptypb.Empty{}, nil
  660. }
  661. func (s *StartedService) TriggerDebugCrash(ctx context.Context, request *DebugCrashRequest) (*emptypb.Empty, error) {
  662. if !s.debug {
  663. return nil, status.Error(codes.PermissionDenied, "debug crash trigger unavailable")
  664. }
  665. if request == nil {
  666. return nil, status.Error(codes.InvalidArgument, "missing debug crash request")
  667. }
  668. switch request.Type {
  669. case DebugCrashRequest_GO:
  670. time.AfterFunc(200*time.Millisecond, func() {
  671. *(*int)(unsafe.Pointer(uintptr(0))) = 0
  672. })
  673. case DebugCrashRequest_NATIVE:
  674. err := s.handler.TriggerNativeCrash()
  675. if err != nil {
  676. return nil, err
  677. }
  678. default:
  679. return nil, status.Error(codes.InvalidArgument, "unknown debug crash type")
  680. }
  681. return &emptypb.Empty{}, nil
  682. }
  683. func (s *StartedService) TriggerOOMReport(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
  684. instance := s.Instance()
  685. if instance == nil {
  686. return nil, status.Error(codes.FailedPrecondition, "service not started")
  687. }
  688. reporter := service.FromContext[oomkiller.OOMReporter](instance.ctx)
  689. if reporter == nil {
  690. return nil, status.Error(codes.Unavailable, "OOM reporter not available")
  691. }
  692. return &emptypb.Empty{}, reporter.WriteReport(memory.Total())
  693. }
  694. func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[ConnectionEvents]) error {
  695. err := s.waitForStarted(server.Context())
  696. if err != nil {
  697. return err
  698. }
  699. s.serviceAccess.RLock()
  700. boxService := s.instance
  701. s.serviceAccess.RUnlock()
  702. if boxService.clashServer == nil {
  703. return E.New("clash server not available")
  704. }
  705. trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
  706. subscription, done, err := s.connectionEventObserver.Subscribe()
  707. if err != nil {
  708. return err
  709. }
  710. defer s.connectionEventObserver.UnSubscribe(subscription)
  711. connectionSnapshots := make(map[uuid.UUID]connectionSnapshot)
  712. initialEvents := s.buildInitialConnectionState(trafficManager, connectionSnapshots)
  713. err = server.Send(&ConnectionEvents{
  714. Events: initialEvents,
  715. Reset_: true,
  716. })
  717. if err != nil {
  718. return err
  719. }
  720. interval := time.Duration(request.Interval)
  721. if interval <= 0 {
  722. interval = time.Second
  723. }
  724. ticker := time.NewTicker(interval)
  725. defer ticker.Stop()
  726. for {
  727. select {
  728. case <-s.ctx.Done():
  729. return s.ctx.Err()
  730. case <-server.Context().Done():
  731. return server.Context().Err()
  732. case <-done:
  733. return nil
  734. case event := <-subscription:
  735. var pendingEvents []*ConnectionEvent
  736. if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
  737. pendingEvents = append(pendingEvents, protoEvent)
  738. }
  739. drain:
  740. for {
  741. select {
  742. case event = <-subscription:
  743. if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
  744. pendingEvents = append(pendingEvents, protoEvent)
  745. }
  746. default:
  747. break drain
  748. }
  749. }
  750. if len(pendingEvents) > 0 {
  751. err = server.Send(&ConnectionEvents{Events: pendingEvents})
  752. if err != nil {
  753. return err
  754. }
  755. }
  756. case <-ticker.C:
  757. protoEvents := s.buildTrafficUpdates(trafficManager, connectionSnapshots)
  758. if len(protoEvents) == 0 {
  759. continue
  760. }
  761. err = server.Send(&ConnectionEvents{Events: protoEvents})
  762. if err != nil {
  763. return err
  764. }
  765. }
  766. }
  767. }
  768. type connectionSnapshot struct {
  769. uplink int64
  770. downlink int64
  771. hadTraffic bool
  772. }
  773. func (s *StartedService) buildInitialConnectionState(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
  774. var events []*ConnectionEvent
  775. for _, metadata := range manager.Connections() {
  776. events = append(events, &ConnectionEvent{
  777. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  778. Id: metadata.ID.String(),
  779. Connection: buildConnectionProto(metadata),
  780. })
  781. snapshots[metadata.ID] = connectionSnapshot{
  782. uplink: metadata.Upload.Load(),
  783. downlink: metadata.Download.Load(),
  784. }
  785. }
  786. for _, metadata := range manager.ClosedConnections() {
  787. conn := buildConnectionProto(metadata)
  788. conn.ClosedAt = metadata.ClosedAt.UnixMilli()
  789. events = append(events, &ConnectionEvent{
  790. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  791. Id: metadata.ID.String(),
  792. Connection: conn,
  793. })
  794. }
  795. return events
  796. }
  797. func (s *StartedService) applyConnectionEvent(event trafficontrol.ConnectionEvent, snapshots map[uuid.UUID]connectionSnapshot) *ConnectionEvent {
  798. switch event.Type {
  799. case trafficontrol.ConnectionEventNew:
  800. if _, exists := snapshots[event.ID]; exists {
  801. return nil
  802. }
  803. snapshots[event.ID] = connectionSnapshot{
  804. uplink: event.Metadata.Upload.Load(),
  805. downlink: event.Metadata.Download.Load(),
  806. }
  807. return &ConnectionEvent{
  808. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  809. Id: event.ID.String(),
  810. Connection: buildConnectionProto(event.Metadata),
  811. }
  812. case trafficontrol.ConnectionEventClosed:
  813. delete(snapshots, event.ID)
  814. protoEvent := &ConnectionEvent{
  815. Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
  816. Id: event.ID.String(),
  817. }
  818. closedAt := event.ClosedAt
  819. if closedAt.IsZero() && !event.Metadata.ClosedAt.IsZero() {
  820. closedAt = event.Metadata.ClosedAt
  821. }
  822. if closedAt.IsZero() {
  823. closedAt = time.Now()
  824. }
  825. protoEvent.ClosedAt = closedAt.UnixMilli()
  826. if event.Metadata.ID != uuid.Nil {
  827. conn := buildConnectionProto(event.Metadata)
  828. conn.ClosedAt = protoEvent.ClosedAt
  829. protoEvent.Connection = conn
  830. }
  831. return protoEvent
  832. default:
  833. return nil
  834. }
  835. }
  836. func (s *StartedService) buildTrafficUpdates(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
  837. activeConnections := manager.Connections()
  838. activeIndex := make(map[uuid.UUID]*trafficontrol.TrackerMetadata, len(activeConnections))
  839. var events []*ConnectionEvent
  840. for _, metadata := range activeConnections {
  841. activeIndex[metadata.ID] = metadata
  842. currentUpload := metadata.Upload.Load()
  843. currentDownload := metadata.Download.Load()
  844. snapshot, exists := snapshots[metadata.ID]
  845. if !exists {
  846. snapshots[metadata.ID] = connectionSnapshot{
  847. uplink: currentUpload,
  848. downlink: currentDownload,
  849. }
  850. events = append(events, &ConnectionEvent{
  851. Type: ConnectionEventType_CONNECTION_EVENT_NEW,
  852. Id: metadata.ID.String(),
  853. Connection: buildConnectionProto(metadata),
  854. })
  855. continue
  856. }
  857. uplinkDelta := currentUpload - snapshot.uplink
  858. downlinkDelta := currentDownload - snapshot.downlink
  859. if uplinkDelta < 0 || downlinkDelta < 0 {
  860. if snapshot.hadTraffic {
  861. events = append(events, &ConnectionEvent{
  862. Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
  863. Id: metadata.ID.String(),
  864. UplinkDelta: 0,
  865. DownlinkDelta: 0,
  866. })
  867. }
  868. snapshot.uplink = currentUpload
  869. snapshot.downlink = currentDownload
  870. snapshot.hadTraffic = false
  871. snapshots[metadata.ID] = snapshot
  872. continue
  873. }
  874. if uplinkDelta > 0 || downlinkDelta > 0 {
  875. snapshot.uplink = currentUpload
  876. snapshot.downlink = currentDownload
  877. snapshot.hadTraffic = true
  878. snapshots[metadata.ID] = snapshot
  879. events = append(events, &ConnectionEvent{
  880. Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
  881. Id: metadata.ID.String(),
  882. UplinkDelta: uplinkDelta,
  883. DownlinkDelta: downlinkDelta,
  884. })
  885. continue
  886. }
  887. if snapshot.hadTraffic {
  888. snapshot.uplink = currentUpload
  889. snapshot.downlink = currentDownload
  890. snapshot.hadTraffic = false
  891. snapshots[metadata.ID] = snapshot
  892. events = append(events, &ConnectionEvent{
  893. Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
  894. Id: metadata.ID.String(),
  895. UplinkDelta: 0,
  896. DownlinkDelta: 0,
  897. })
  898. }
  899. }
  900. var closedIndex map[uuid.UUID]*trafficontrol.TrackerMetadata
  901. for id := range snapshots {
  902. if _, exists := activeIndex[id]; exists {
  903. continue
  904. }
  905. if closedIndex == nil {
  906. closedIndex = make(map[uuid.UUID]*trafficontrol.TrackerMetadata)
  907. for _, metadata := range manager.ClosedConnections() {
  908. closedIndex[metadata.ID] = metadata
  909. }
  910. }
  911. closedAt := time.Now()
  912. var conn *Connection
  913. if metadata, ok := closedIndex[id]; ok {
  914. if !metadata.ClosedAt.IsZero() {
  915. closedAt = metadata.ClosedAt
  916. }
  917. conn = buildConnectionProto(metadata)
  918. conn.ClosedAt = closedAt.UnixMilli()
  919. }
  920. events = append(events, &ConnectionEvent{
  921. Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
  922. Id: id.String(),
  923. ClosedAt: closedAt.UnixMilli(),
  924. Connection: conn,
  925. })
  926. delete(snapshots, id)
  927. }
  928. return events
  929. }
  930. func buildConnectionProto(metadata *trafficontrol.TrackerMetadata) *Connection {
  931. var rule string
  932. if metadata.Rule != nil {
  933. rule = metadata.Rule.String()
  934. }
  935. uplinkTotal := metadata.Upload.Load()
  936. downlinkTotal := metadata.Download.Load()
  937. var processInfo *ProcessInfo
  938. if metadata.Metadata.ProcessInfo != nil {
  939. processInfo = &ProcessInfo{
  940. ProcessId: metadata.Metadata.ProcessInfo.ProcessID,
  941. UserId: metadata.Metadata.ProcessInfo.UserId,
  942. UserName: metadata.Metadata.ProcessInfo.UserName,
  943. ProcessPath: metadata.Metadata.ProcessInfo.ProcessPath,
  944. PackageNames: metadata.Metadata.ProcessInfo.AndroidPackageNames,
  945. }
  946. }
  947. return &Connection{
  948. Id: metadata.ID.String(),
  949. Inbound: metadata.Metadata.Inbound,
  950. InboundType: metadata.Metadata.InboundType,
  951. IpVersion: int32(metadata.Metadata.IPVersion),
  952. Network: metadata.Metadata.Network,
  953. Source: metadata.Metadata.Source.String(),
  954. Destination: metadata.Metadata.Destination.String(),
  955. Domain: metadata.Metadata.Domain,
  956. Protocol: metadata.Metadata.Protocol,
  957. User: metadata.Metadata.User,
  958. FromOutbound: metadata.Metadata.Outbound,
  959. CreatedAt: metadata.CreatedAt.UnixMilli(),
  960. UplinkTotal: uplinkTotal,
  961. DownlinkTotal: downlinkTotal,
  962. Rule: rule,
  963. Outbound: metadata.Outbound,
  964. OutboundType: metadata.OutboundType,
  965. ChainList: metadata.Chain,
  966. ProcessInfo: processInfo,
  967. }
  968. }
  969. func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {
  970. s.serviceAccess.RLock()
  971. switch s.serviceStatus.Status {
  972. case ServiceStatus_STARTING, ServiceStatus_STARTED:
  973. default:
  974. s.serviceAccess.RUnlock()
  975. return nil, os.ErrInvalid
  976. }
  977. boxService := s.instance
  978. s.serviceAccess.RUnlock()
  979. targetConn := boxService.clashServer.(*clashapi.Server).TrafficManager().Connection(uuid.FromStringOrNil(request.Id))
  980. if targetConn != nil {
  981. targetConn.Close()
  982. }
  983. return &emptypb.Empty{}, nil
  984. }
  985. func (s *StartedService) CloseAllConnections(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
  986. s.serviceAccess.RLock()
  987. nowService := s.instance
  988. s.serviceAccess.RUnlock()
  989. if nowService != nil && nowService.connectionManager != nil {
  990. nowService.connectionManager.CloseAll()
  991. }
  992. return &emptypb.Empty{}, nil
  993. }
  994. func (s *StartedService) GetDeprecatedWarnings(ctx context.Context, empty *emptypb.Empty) (*DeprecatedWarnings, error) {
  995. s.serviceAccess.RLock()
  996. if s.serviceStatus.Status != ServiceStatus_STARTED {
  997. s.serviceAccess.RUnlock()
  998. return nil, os.ErrInvalid
  999. }
  1000. boxService := s.instance
  1001. s.serviceAccess.RUnlock()
  1002. notes := service.FromContext[deprecated.Manager](boxService.ctx).(*deprecatedManager).Get()
  1003. return &DeprecatedWarnings{
  1004. Warnings: common.Map(notes, func(it deprecated.Note) *DeprecatedWarning {
  1005. return &DeprecatedWarning{
  1006. Message: it.Message(),
  1007. Impending: it.Impending(),
  1008. MigrationLink: it.MigrationLink,
  1009. Description: it.Description,
  1010. DeprecatedVersion: it.DeprecatedVersion,
  1011. ScheduledVersion: it.ScheduledVersion,
  1012. }
  1013. }),
  1014. }, nil
  1015. }
  1016. func (s *StartedService) GetStartedAt(ctx context.Context, empty *emptypb.Empty) (*StartedAt, error) {
  1017. s.serviceAccess.RLock()
  1018. defer s.serviceAccess.RUnlock()
  1019. return &StartedAt{StartedAt: s.startedAt.UnixMilli()}, nil
  1020. }
  1021. func (s *StartedService) SubscribeOutbounds(_ *emptypb.Empty, server grpc.ServerStreamingServer[OutboundList]) error {
  1022. err := s.waitForStarted(server.Context())
  1023. if err != nil {
  1024. return err
  1025. }
  1026. subscription, done, err := s.urlTestObserver.Subscribe()
  1027. if err != nil {
  1028. return err
  1029. }
  1030. defer s.urlTestObserver.UnSubscribe(subscription)
  1031. for {
  1032. s.serviceAccess.RLock()
  1033. if s.serviceStatus.Status != ServiceStatus_STARTED {
  1034. s.serviceAccess.RUnlock()
  1035. return os.ErrInvalid
  1036. }
  1037. boxService := s.instance
  1038. s.serviceAccess.RUnlock()
  1039. historyStorage := boxService.urlTestHistoryStorage
  1040. var list OutboundList
  1041. for _, ob := range boxService.instance.Outbound().Outbounds() {
  1042. item := &GroupItem{
  1043. Tag: ob.Tag(),
  1044. Type: ob.Type(),
  1045. }
  1046. if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(ob)); history != nil {
  1047. item.UrlTestTime = history.Time.Unix()
  1048. item.UrlTestDelay = int32(history.Delay)
  1049. }
  1050. list.Outbounds = append(list.Outbounds, item)
  1051. }
  1052. for _, ep := range boxService.instance.Endpoint().Endpoints() {
  1053. item := &GroupItem{
  1054. Tag: ep.Tag(),
  1055. Type: ep.Type(),
  1056. }
  1057. if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(ep)); history != nil {
  1058. item.UrlTestTime = history.Time.Unix()
  1059. item.UrlTestDelay = int32(history.Delay)
  1060. }
  1061. list.Outbounds = append(list.Outbounds, item)
  1062. }
  1063. err = server.Send(&list)
  1064. if err != nil {
  1065. return err
  1066. }
  1067. select {
  1068. case <-subscription:
  1069. case <-s.ctx.Done():
  1070. return s.ctx.Err()
  1071. case <-server.Context().Done():
  1072. return server.Context().Err()
  1073. case <-done:
  1074. return nil
  1075. }
  1076. }
  1077. }
  1078. func resolveOutbound(instance *Instance, tag string) (adapter.Outbound, error) {
  1079. if tag == "" {
  1080. return instance.instance.Outbound().Default(), nil
  1081. }
  1082. outbound, loaded := instance.instance.Outbound().Outbound(tag)
  1083. if !loaded {
  1084. return nil, E.New("outbound not found: ", tag)
  1085. }
  1086. return outbound, nil
  1087. }
  1088. func (s *StartedService) StartNetworkQualityTest(
  1089. request *NetworkQualityTestRequest,
  1090. server grpc.ServerStreamingServer[NetworkQualityTestProgress],
  1091. ) error {
  1092. err := s.waitForStarted(server.Context())
  1093. if err != nil {
  1094. return err
  1095. }
  1096. s.serviceAccess.RLock()
  1097. boxService := s.instance
  1098. s.serviceAccess.RUnlock()
  1099. outbound, err := resolveOutbound(boxService, request.OutboundTag)
  1100. if err != nil {
  1101. return err
  1102. }
  1103. resolvedDialer := dialer.NewResolveDialer(boxService.ctx, outbound, true, "", adapter.DNSQueryOptions{}, 0)
  1104. httpClient := networkquality.NewHTTPClient(resolvedDialer)
  1105. defer httpClient.CloseIdleConnections()
  1106. measurementClientFactory, err := networkquality.NewOptionalHTTP3Factory(resolvedDialer, request.Http3)
  1107. if err != nil {
  1108. return err
  1109. }
  1110. result, nqErr := networkquality.Run(networkquality.Options{
  1111. ConfigURL: request.ConfigURL,
  1112. HTTPClient: httpClient,
  1113. NewMeasurementClient: measurementClientFactory,
  1114. Serial: request.Serial,
  1115. MaxRuntime: time.Duration(request.MaxRuntimeSeconds) * time.Second,
  1116. Context: server.Context(),
  1117. OnProgress: func(p networkquality.Progress) {
  1118. _ = server.Send(&NetworkQualityTestProgress{
  1119. Phase: int32(p.Phase),
  1120. DownloadCapacity: p.DownloadCapacity,
  1121. UploadCapacity: p.UploadCapacity,
  1122. DownloadRPM: p.DownloadRPM,
  1123. UploadRPM: p.UploadRPM,
  1124. IdleLatencyMs: p.IdleLatencyMs,
  1125. ElapsedMs: p.ElapsedMs,
  1126. DownloadCapacityAccuracy: int32(p.DownloadCapacityAccuracy),
  1127. UploadCapacityAccuracy: int32(p.UploadCapacityAccuracy),
  1128. DownloadRPMAccuracy: int32(p.DownloadRPMAccuracy),
  1129. UploadRPMAccuracy: int32(p.UploadRPMAccuracy),
  1130. })
  1131. },
  1132. })
  1133. if nqErr != nil {
  1134. return server.Send(&NetworkQualityTestProgress{
  1135. IsFinal: true,
  1136. Error: nqErr.Error(),
  1137. })
  1138. }
  1139. return server.Send(&NetworkQualityTestProgress{
  1140. Phase: int32(networkquality.PhaseDone),
  1141. DownloadCapacity: result.DownloadCapacity,
  1142. UploadCapacity: result.UploadCapacity,
  1143. DownloadRPM: result.DownloadRPM,
  1144. UploadRPM: result.UploadRPM,
  1145. IdleLatencyMs: result.IdleLatencyMs,
  1146. IsFinal: true,
  1147. DownloadCapacityAccuracy: int32(result.DownloadCapacityAccuracy),
  1148. UploadCapacityAccuracy: int32(result.UploadCapacityAccuracy),
  1149. DownloadRPMAccuracy: int32(result.DownloadRPMAccuracy),
  1150. UploadRPMAccuracy: int32(result.UploadRPMAccuracy),
  1151. })
  1152. }
  1153. func (s *StartedService) StartSTUNTest(
  1154. request *STUNTestRequest,
  1155. server grpc.ServerStreamingServer[STUNTestProgress],
  1156. ) error {
  1157. err := s.waitForStarted(server.Context())
  1158. if err != nil {
  1159. return err
  1160. }
  1161. s.serviceAccess.RLock()
  1162. boxService := s.instance
  1163. s.serviceAccess.RUnlock()
  1164. outbound, err := resolveOutbound(boxService, request.OutboundTag)
  1165. if err != nil {
  1166. return err
  1167. }
  1168. resolvedDialer := dialer.NewResolveDialer(boxService.ctx, outbound, true, "", adapter.DNSQueryOptions{}, 0)
  1169. result, stunErr := stun.Run(stun.Options{
  1170. Server: request.Server,
  1171. Dialer: resolvedDialer,
  1172. Context: server.Context(),
  1173. OnProgress: func(p stun.Progress) {
  1174. _ = server.Send(&STUNTestProgress{
  1175. Phase: int32(p.Phase),
  1176. ExternalAddr: p.ExternalAddr,
  1177. LatencyMs: p.LatencyMs,
  1178. NatMapping: int32(p.NATMapping),
  1179. NatFiltering: int32(p.NATFiltering),
  1180. })
  1181. },
  1182. })
  1183. if stunErr != nil {
  1184. return server.Send(&STUNTestProgress{
  1185. IsFinal: true,
  1186. Error: stunErr.Error(),
  1187. })
  1188. }
  1189. return server.Send(&STUNTestProgress{
  1190. Phase: int32(stun.PhaseDone),
  1191. ExternalAddr: result.ExternalAddr,
  1192. LatencyMs: result.LatencyMs,
  1193. NatMapping: int32(result.NATMapping),
  1194. NatFiltering: int32(result.NATFiltering),
  1195. IsFinal: true,
  1196. NatTypeSupported: result.NATTypeSupported,
  1197. })
  1198. }
  1199. func (s *StartedService) SubscribeTailscaleStatus(
  1200. _ *emptypb.Empty,
  1201. server grpc.ServerStreamingServer[TailscaleStatusUpdate],
  1202. ) error {
  1203. err := s.waitForStarted(server.Context())
  1204. if err != nil {
  1205. return err
  1206. }
  1207. s.serviceAccess.RLock()
  1208. boxService := s.instance
  1209. s.serviceAccess.RUnlock()
  1210. endpointManager := service.FromContext[adapter.EndpointManager](boxService.ctx)
  1211. if endpointManager == nil {
  1212. return status.Error(codes.FailedPrecondition, "endpoint manager not available")
  1213. }
  1214. type tailscaleEndpoint struct {
  1215. tag string
  1216. provider adapter.TailscaleEndpoint
  1217. }
  1218. var endpoints []tailscaleEndpoint
  1219. for _, endpoint := range endpointManager.Endpoints() {
  1220. if endpoint.Type() != C.TypeTailscale {
  1221. continue
  1222. }
  1223. provider, loaded := endpoint.(adapter.TailscaleEndpoint)
  1224. if !loaded {
  1225. continue
  1226. }
  1227. endpoints = append(endpoints, tailscaleEndpoint{
  1228. tag: endpoint.Tag(),
  1229. provider: provider,
  1230. })
  1231. }
  1232. if len(endpoints) == 0 {
  1233. return status.Error(codes.NotFound, "no Tailscale endpoint found")
  1234. }
  1235. type taggedStatus struct {
  1236. tag string
  1237. status *adapter.TailscaleEndpointStatus
  1238. }
  1239. updates := make(chan taggedStatus, len(endpoints))
  1240. ctx, cancel := context.WithCancel(server.Context())
  1241. defer cancel()
  1242. var waitGroup sync.WaitGroup
  1243. for _, endpoint := range endpoints {
  1244. waitGroup.Add(1)
  1245. go func(tag string, provider adapter.TailscaleEndpoint) {
  1246. defer waitGroup.Done()
  1247. _ = provider.SubscribeTailscaleStatus(ctx, func(endpointStatus *adapter.TailscaleEndpointStatus) {
  1248. select {
  1249. case updates <- taggedStatus{tag: tag, status: endpointStatus}:
  1250. case <-ctx.Done():
  1251. }
  1252. })
  1253. }(endpoint.tag, endpoint.provider)
  1254. }
  1255. go func() {
  1256. waitGroup.Wait()
  1257. close(updates)
  1258. }()
  1259. var tags []string
  1260. statuses := make(map[string]*adapter.TailscaleEndpointStatus, len(endpoints))
  1261. for update := range updates {
  1262. if _, exists := statuses[update.tag]; !exists {
  1263. tags = append(tags, update.tag)
  1264. }
  1265. statuses[update.tag] = update.status
  1266. protoEndpoints := make([]*TailscaleEndpointStatus, 0, len(statuses))
  1267. for _, tag := range tags {
  1268. protoEndpoints = append(protoEndpoints, tailscaleEndpointStatusToProto(tag, statuses[tag]))
  1269. }
  1270. sendErr := server.Send(&TailscaleStatusUpdate{
  1271. Endpoints: protoEndpoints,
  1272. })
  1273. if sendErr != nil {
  1274. return sendErr
  1275. }
  1276. }
  1277. return nil
  1278. }
  1279. func tailscaleEndpointStatusToProto(tag string, s *adapter.TailscaleEndpointStatus) *TailscaleEndpointStatus {
  1280. userGroups := make([]*TailscaleUserGroup, len(s.UserGroups))
  1281. for i, group := range s.UserGroups {
  1282. peers := make([]*TailscalePeer, len(group.Peers))
  1283. for j, peer := range group.Peers {
  1284. peers[j] = tailscalePeerToProto(peer)
  1285. }
  1286. userGroups[i] = &TailscaleUserGroup{
  1287. UserID: group.UserID,
  1288. LoginName: group.LoginName,
  1289. DisplayName: group.DisplayName,
  1290. ProfilePicURL: group.ProfilePicURL,
  1291. Peers: peers,
  1292. }
  1293. }
  1294. result := &TailscaleEndpointStatus{
  1295. EndpointTag: tag,
  1296. BackendState: s.BackendState,
  1297. AuthURL: s.AuthURL,
  1298. NetworkName: s.NetworkName,
  1299. MagicDNSSuffix: s.MagicDNSSuffix,
  1300. UserGroups: userGroups,
  1301. }
  1302. if s.Self != nil {
  1303. result.Self = tailscalePeerToProto(s.Self)
  1304. }
  1305. return result
  1306. }
  1307. func tailscalePeerToProto(peer *adapter.TailscalePeer) *TailscalePeer {
  1308. return &TailscalePeer{
  1309. HostName: peer.HostName,
  1310. DnsName: peer.DNSName,
  1311. Os: peer.OS,
  1312. TailscaleIPs: peer.TailscaleIPs,
  1313. Online: peer.Online,
  1314. ExitNode: peer.ExitNode,
  1315. ExitNodeOption: peer.ExitNodeOption,
  1316. Active: peer.Active,
  1317. RxBytes: peer.RxBytes,
  1318. TxBytes: peer.TxBytes,
  1319. KeyExpiry: peer.KeyExpiry,
  1320. }
  1321. }
  1322. func (s *StartedService) StartTailscalePing(
  1323. request *TailscalePingRequest,
  1324. server grpc.ServerStreamingServer[TailscalePingResponse],
  1325. ) error {
  1326. err := s.waitForStarted(server.Context())
  1327. if err != nil {
  1328. return err
  1329. }
  1330. s.serviceAccess.RLock()
  1331. boxService := s.instance
  1332. s.serviceAccess.RUnlock()
  1333. endpointManager := service.FromContext[adapter.EndpointManager](boxService.ctx)
  1334. if endpointManager == nil {
  1335. return status.Error(codes.FailedPrecondition, "endpoint manager not available")
  1336. }
  1337. var provider adapter.TailscaleEndpoint
  1338. if request.EndpointTag != "" {
  1339. endpoint, loaded := endpointManager.Get(request.EndpointTag)
  1340. if !loaded {
  1341. return status.Error(codes.NotFound, "endpoint not found: "+request.EndpointTag)
  1342. }
  1343. if endpoint.Type() != C.TypeTailscale {
  1344. return status.Error(codes.InvalidArgument, "endpoint is not Tailscale: "+request.EndpointTag)
  1345. }
  1346. pingProvider, loaded := endpoint.(adapter.TailscaleEndpoint)
  1347. if !loaded {
  1348. return status.Error(codes.FailedPrecondition, "endpoint does not support ping")
  1349. }
  1350. provider = pingProvider
  1351. } else {
  1352. for _, endpoint := range endpointManager.Endpoints() {
  1353. if endpoint.Type() != C.TypeTailscale {
  1354. continue
  1355. }
  1356. pingProvider, loaded := endpoint.(adapter.TailscaleEndpoint)
  1357. if loaded {
  1358. provider = pingProvider
  1359. break
  1360. }
  1361. }
  1362. if provider == nil {
  1363. return status.Error(codes.NotFound, "no Tailscale endpoint found")
  1364. }
  1365. }
  1366. return provider.StartTailscalePing(server.Context(), request.PeerIP, func(result *adapter.TailscalePingResult) {
  1367. _ = server.Send(&TailscalePingResponse{
  1368. LatencyMs: result.LatencyMs,
  1369. IsDirect: result.IsDirect,
  1370. Endpoint: result.Endpoint,
  1371. DerpRegionID: result.DERPRegionID,
  1372. DerpRegionCode: result.DERPRegionCode,
  1373. Error: result.Error,
  1374. })
  1375. })
  1376. }
  1377. func (s *StartedService) mustEmbedUnimplementedStartedServiceServer() {
  1378. }
  1379. func (s *StartedService) WriteMessage(level log.Level, message string) {
  1380. item := &log.Entry{Level: level, Message: message}
  1381. s.logAccess.Lock()
  1382. s.logLines.PushBack(item)
  1383. if s.logLines.Len() > s.logMaxLines {
  1384. s.logLines.Remove(s.logLines.Front())
  1385. }
  1386. s.logAccess.Unlock()
  1387. s.logSubscriber.Emit(item)
  1388. if s.debug {
  1389. s.handler.WriteDebugMessage(message)
  1390. }
  1391. }
  1392. func (s *StartedService) Instance() *Instance {
  1393. s.serviceAccess.RLock()
  1394. defer s.serviceAccess.RUnlock()
  1395. return s.instance
  1396. }