| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489 |
- package daemon
- import (
- "context"
- "os"
- "runtime"
- "sync"
- "time"
- "unsafe"
- "github.com/sagernet/sing-box/adapter"
- "github.com/sagernet/sing-box/common/dialer"
- "github.com/sagernet/sing-box/common/networkquality"
- "github.com/sagernet/sing-box/common/stun"
- "github.com/sagernet/sing-box/common/urltest"
- C "github.com/sagernet/sing-box/constant"
- "github.com/sagernet/sing-box/experimental/clashapi"
- "github.com/sagernet/sing-box/experimental/clashapi/trafficontrol"
- "github.com/sagernet/sing-box/experimental/deprecated"
- "github.com/sagernet/sing-box/log"
- "github.com/sagernet/sing-box/protocol/group"
- "github.com/sagernet/sing-box/service/oomkiller"
- "github.com/sagernet/sing/common"
- "github.com/sagernet/sing/common/batch"
- E "github.com/sagernet/sing/common/exceptions"
- "github.com/sagernet/sing/common/memory"
- "github.com/sagernet/sing/common/observable"
- "github.com/sagernet/sing/common/x/list"
- "github.com/sagernet/sing/service"
- "github.com/gofrs/uuid/v5"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "google.golang.org/protobuf/types/known/emptypb"
- )
- var _ StartedServiceServer = (*StartedService)(nil)
- type StartedService struct {
- ctx context.Context
- // platform adapter.PlatformInterface
- handler PlatformHandler
- debug bool
- logMaxLines int
- oomKillerEnabled bool
- oomKillerDisabled bool
- oomMemoryLimit uint64
- // workingDirectory string
- // tempDirectory string
- // userID int
- // groupID int
- // systemProxyEnabled bool
- serviceAccess sync.RWMutex
- serviceStatus *ServiceStatus
- serviceStatusSubscriber *observable.Subscriber[*ServiceStatus]
- serviceStatusObserver *observable.Observer[*ServiceStatus]
- logAccess sync.RWMutex
- logLines list.List[*log.Entry]
- logSubscriber *observable.Subscriber[*log.Entry]
- logObserver *observable.Observer[*log.Entry]
- instance *Instance
- startedAt time.Time
- urlTestSubscriber *observable.Subscriber[struct{}]
- urlTestObserver *observable.Observer[struct{}]
- urlTestHistoryStorage *urltest.HistoryStorage
- clashModeSubscriber *observable.Subscriber[struct{}]
- clashModeObserver *observable.Observer[struct{}]
- connectionEventSubscriber *observable.Subscriber[trafficontrol.ConnectionEvent]
- connectionEventObserver *observable.Observer[trafficontrol.ConnectionEvent]
- }
- type ServiceOptions struct {
- Context context.Context
- // Platform adapter.PlatformInterface
- Handler PlatformHandler
- Debug bool
- LogMaxLines int
- OOMKillerEnabled bool
- OOMKillerDisabled bool
- OOMMemoryLimit uint64
- // WorkingDirectory string
- // TempDirectory string
- // UserID int
- // GroupID int
- // SystemProxyEnabled bool
- }
- func NewStartedService(options ServiceOptions) *StartedService {
- s := &StartedService{
- ctx: options.Context,
- // platform: options.Platform,
- handler: options.Handler,
- debug: options.Debug,
- logMaxLines: options.LogMaxLines,
- oomKillerEnabled: options.OOMKillerEnabled,
- oomKillerDisabled: options.OOMKillerDisabled,
- oomMemoryLimit: options.OOMMemoryLimit,
- // workingDirectory: options.WorkingDirectory,
- // tempDirectory: options.TempDirectory,
- // userID: options.UserID,
- // groupID: options.GroupID,
- // systemProxyEnabled: options.SystemProxyEnabled,
- serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
- serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
- logSubscriber: observable.NewSubscriber[*log.Entry](128),
- urlTestSubscriber: observable.NewSubscriber[struct{}](1),
- urlTestHistoryStorage: urltest.NewHistoryStorage(),
- clashModeSubscriber: observable.NewSubscriber[struct{}](1),
- connectionEventSubscriber: observable.NewSubscriber[trafficontrol.ConnectionEvent](256),
- }
- s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
- s.logObserver = observable.NewObserver(s.logSubscriber, 64)
- s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
- s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
- s.connectionEventObserver = observable.NewObserver(s.connectionEventSubscriber, 64)
- return s
- }
- func (s *StartedService) resetLogs() {
- s.logAccess.Lock()
- s.logLines = list.List[*log.Entry]{}
- s.logAccess.Unlock()
- s.logSubscriber.Emit(nil)
- }
- func (s *StartedService) updateStatus(newStatus ServiceStatus_Type) {
- statusObject := &ServiceStatus{Status: newStatus}
- s.serviceStatusSubscriber.Emit(statusObject)
- s.serviceStatus = statusObject
- }
- func (s *StartedService) updateStatusError(err error) error {
- statusObject := &ServiceStatus{Status: ServiceStatus_FATAL, ErrorMessage: err.Error()}
- s.serviceStatusSubscriber.Emit(statusObject)
- s.serviceStatus = statusObject
- s.serviceAccess.Unlock()
- return err
- }
- func (s *StartedService) waitForStarted(ctx context.Context) error {
- s.serviceAccess.RLock()
- currentStatus := s.serviceStatus.Status
- s.serviceAccess.RUnlock()
- switch currentStatus {
- case ServiceStatus_STARTED:
- return nil
- case ServiceStatus_STARTING:
- default:
- return os.ErrInvalid
- }
- subscription, done, err := s.serviceStatusObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.serviceStatusObserver.UnSubscribe(subscription)
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-s.ctx.Done():
- return s.ctx.Err()
- case status := <-subscription:
- switch status.Status {
- case ServiceStatus_STARTED:
- return nil
- case ServiceStatus_FATAL:
- return E.New(status.ErrorMessage)
- case ServiceStatus_IDLE, ServiceStatus_STOPPING:
- return os.ErrInvalid
- }
- case <-done:
- return os.ErrClosed
- }
- }
- }
- func (s *StartedService) StartOrReloadService(profileContent string, options *OverrideOptions) error {
- s.serviceAccess.Lock()
- switch s.serviceStatus.Status {
- case ServiceStatus_IDLE, ServiceStatus_STARTED, ServiceStatus_STARTING, ServiceStatus_FATAL:
- default:
- s.serviceAccess.Unlock()
- return os.ErrInvalid
- }
- oldInstance := s.instance
- if oldInstance != nil {
- s.updateStatus(ServiceStatus_STOPPING)
- s.serviceAccess.Unlock()
- _ = oldInstance.Close()
- s.serviceAccess.Lock()
- }
- s.updateStatus(ServiceStatus_STARTING)
- s.resetLogs()
- instance, err := s.newInstance(profileContent, options)
- if err != nil {
- return s.updateStatusError(err)
- }
- s.instance = instance
- instance.urlTestHistoryStorage.SetHook(s.urlTestSubscriber)
- if instance.clashServer != nil {
- instance.clashServer.SetModeUpdateHook(s.clashModeSubscriber)
- instance.clashServer.(*clashapi.Server).TrafficManager().SetEventHook(s.connectionEventSubscriber)
- }
- s.serviceAccess.Unlock()
- err = instance.Start()
- s.serviceAccess.Lock()
- if s.serviceStatus.Status != ServiceStatus_STARTING {
- s.serviceAccess.Unlock()
- return nil
- }
- if err != nil {
- return s.updateStatusError(err)
- }
- s.startedAt = time.Now()
- s.updateStatus(ServiceStatus_STARTED)
- s.serviceAccess.Unlock()
- runtime.GC()
- return nil
- }
- func (s *StartedService) Close() {
- s.serviceStatusSubscriber.Close()
- s.logSubscriber.Close()
- s.urlTestSubscriber.Close()
- s.clashModeSubscriber.Close()
- s.connectionEventSubscriber.Close()
- }
- func (s *StartedService) CloseService() error {
- s.serviceAccess.Lock()
- switch s.serviceStatus.Status {
- case ServiceStatus_STARTING, ServiceStatus_STARTED:
- default:
- s.serviceAccess.Unlock()
- return os.ErrInvalid
- }
- s.updateStatus(ServiceStatus_STOPPING)
- instance := s.instance
- s.instance = nil
- if instance != nil {
- err := instance.Close()
- if err != nil {
- return s.updateStatusError(err)
- }
- }
- s.startedAt = time.Time{}
- s.updateStatus(ServiceStatus_IDLE)
- s.serviceAccess.Unlock()
- runtime.GC()
- return nil
- }
- func (s *StartedService) SetError(err error) {
- s.serviceAccess.Lock()
- s.updateStatusError(err)
- s.WriteMessage(log.LevelError, err.Error())
- }
- func (s *StartedService) StopService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
- err := s.handler.ServiceStop()
- if err != nil {
- return nil, err
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) ReloadService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
- err := s.handler.ServiceReload()
- if err != nil {
- return nil, err
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) SubscribeServiceStatus(empty *emptypb.Empty, server grpc.ServerStreamingServer[ServiceStatus]) error {
- subscription, done, err := s.serviceStatusObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.serviceStatusObserver.UnSubscribe(subscription)
- err = server.Send(s.serviceStatus)
- if err != nil {
- return err
- }
- for {
- select {
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case newStatus := <-subscription:
- err = server.Send(newStatus)
- if err != nil {
- return err
- }
- case <-done:
- return nil
- }
- }
- }
- func (s *StartedService) SubscribeLog(empty *emptypb.Empty, server grpc.ServerStreamingServer[Log]) error {
- var savedLines []*log.Entry
- s.logAccess.Lock()
- savedLines = make([]*log.Entry, 0, s.logLines.Len())
- for element := s.logLines.Front(); element != nil; element = element.Next() {
- savedLines = append(savedLines, element.Value)
- }
- subscription, done, err := s.logObserver.Subscribe()
- s.logAccess.Unlock()
- if err != nil {
- return err
- }
- defer s.logObserver.UnSubscribe(subscription)
- err = server.Send(&Log{
- Messages: common.Map(savedLines, func(it *log.Entry) *Log_Message {
- return &Log_Message{
- Level: LogLevel(it.Level),
- Message: it.Message,
- }
- }),
- Reset_: true,
- })
- if err != nil {
- return err
- }
- for {
- select {
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case message := <-subscription:
- var rawMessage Log
- if message == nil {
- rawMessage.Reset_ = true
- } else {
- rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
- Level: LogLevel(message.Level),
- Message: message.Message,
- })
- }
- fetch:
- for {
- select {
- case message = <-subscription:
- if message == nil {
- rawMessage.Messages = nil
- rawMessage.Reset_ = true
- } else {
- rawMessage.Messages = append(rawMessage.Messages, &Log_Message{
- Level: LogLevel(message.Level),
- Message: message.Message,
- })
- }
- default:
- break fetch
- }
- }
- err = server.Send(&rawMessage)
- if err != nil {
- return err
- }
- case <-done:
- return nil
- }
- }
- }
- func (s *StartedService) GetDefaultLogLevel(ctx context.Context, empty *emptypb.Empty) (*DefaultLogLevel, error) {
- s.serviceAccess.RLock()
- switch s.serviceStatus.Status {
- case ServiceStatus_STARTING, ServiceStatus_STARTED:
- default:
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- logLevel := s.instance.instance.LogFactory().Level()
- s.serviceAccess.RUnlock()
- return &DefaultLogLevel{Level: LogLevel(logLevel)}, nil
- }
- func (s *StartedService) ClearLogs(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
- s.resetLogs()
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) SubscribeStatus(request *SubscribeStatusRequest, server grpc.ServerStreamingServer[Status]) error {
- interval := time.Duration(request.Interval)
- if interval <= 0 {
- interval = time.Second // Default to 1 second
- }
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- status := s.readStatus()
- uploadTotal := status.UplinkTotal
- downloadTotal := status.DownlinkTotal
- for {
- err := server.Send(status)
- if err != nil {
- return err
- }
- select {
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case <-ticker.C:
- }
- status = s.readStatus()
- upload := status.UplinkTotal - uploadTotal
- download := status.DownlinkTotal - downloadTotal
- uploadTotal = status.UplinkTotal
- downloadTotal = status.DownlinkTotal
- status.Uplink = upload
- status.Downlink = download
- }
- }
- func (s *StartedService) readStatus() *Status {
- var status Status
- status.Memory = memory.Total()
- status.Goroutines = int32(runtime.NumGoroutine())
- s.serviceAccess.RLock()
- nowService := s.instance
- s.serviceAccess.RUnlock()
- if nowService != nil && nowService.connectionManager != nil {
- status.ConnectionsOut = int32(nowService.connectionManager.Count())
- }
- if nowService != nil {
- if clashServer := nowService.clashServer; clashServer != nil {
- status.TrafficAvailable = true
- trafficManager := clashServer.(*clashapi.Server).TrafficManager()
- status.UplinkTotal, status.DownlinkTotal = trafficManager.Total()
- status.ConnectionsIn = int32(trafficManager.ConnectionsLen())
- }
- }
- return &status
- }
- func (s *StartedService) SubscribeGroups(empty *emptypb.Empty, server grpc.ServerStreamingServer[Groups]) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- subscription, done, err := s.urlTestObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.urlTestObserver.UnSubscribe(subscription)
- for {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return os.ErrInvalid
- }
- groups := s.readGroups()
- s.serviceAccess.RUnlock()
- err = server.Send(groups)
- if err != nil {
- return err
- }
- select {
- case <-subscription:
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case <-done:
- return nil
- }
- }
- }
- func (s *StartedService) readGroups() *Groups {
- historyStorage := s.instance.urlTestHistoryStorage
- boxService := s.instance
- outbounds := boxService.instance.Outbound().Outbounds()
- var iGroups []adapter.OutboundGroup
- for _, it := range outbounds {
- if group, isGroup := it.(adapter.OutboundGroup); isGroup {
- iGroups = append(iGroups, group)
- }
- }
- var gs Groups
- for _, iGroup := range iGroups {
- var g Group
- g.Tag = iGroup.Tag()
- g.Type = iGroup.Type()
- _, g.Selectable = iGroup.(*group.Selector)
- g.Selected = iGroup.Now()
- if boxService.cacheFile != nil {
- if isExpand, loaded := boxService.cacheFile.LoadGroupExpand(g.Tag); loaded {
- g.IsExpand = isExpand
- }
- }
- for _, itemTag := range iGroup.All() {
- itemOutbound, isLoaded := boxService.instance.Outbound().Outbound(itemTag)
- if !isLoaded {
- continue
- }
- var item GroupItem
- item.Tag = itemTag
- item.Type = itemOutbound.Type()
- if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(itemOutbound)); history != nil {
- item.UrlTestTime = history.Time.Unix()
- item.UrlTestDelay = int32(history.Delay)
- }
- g.Items = append(g.Items, &item)
- }
- if len(g.Items) < 2 {
- continue
- }
- gs.Group = append(gs.Group, &g)
- }
- return &gs
- }
- func (s *StartedService) GetClashModeStatus(ctx context.Context, empty *emptypb.Empty) (*ClashModeStatus, error) {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- clashServer := s.instance.clashServer
- s.serviceAccess.RUnlock()
- if clashServer == nil {
- return nil, os.ErrInvalid
- }
- return &ClashModeStatus{
- ModeList: clashServer.ModeList(),
- CurrentMode: clashServer.Mode(),
- }, nil
- }
- func (s *StartedService) SubscribeClashMode(empty *emptypb.Empty, server grpc.ServerStreamingServer[ClashMode]) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- subscription, done, err := s.clashModeObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.clashModeObserver.UnSubscribe(subscription)
- for {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return os.ErrInvalid
- }
- message := &ClashMode{Mode: s.instance.clashServer.Mode()}
- s.serviceAccess.RUnlock()
- err = server.Send(message)
- if err != nil {
- return err
- }
- select {
- case <-subscription:
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case <-done:
- return nil
- }
- }
- }
- func (s *StartedService) SetClashMode(ctx context.Context, request *ClashMode) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- clashServer := s.instance.clashServer
- s.serviceAccess.RUnlock()
- clashServer.(*clashapi.Server).SetMode(request.Mode)
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) URLTest(ctx context.Context, request *URLTestRequest) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- boxService := s.instance
- s.serviceAccess.RUnlock()
- groupTag := request.OutboundTag
- abstractOutboundGroup, isLoaded := boxService.instance.Outbound().Outbound(groupTag)
- if !isLoaded {
- return nil, E.New("outbound group not found: ", groupTag)
- }
- outboundGroup, isOutboundGroup := abstractOutboundGroup.(adapter.OutboundGroup)
- if !isOutboundGroup {
- return nil, E.New("outbound is not a group: ", groupTag)
- }
- urlTest, isURLTest := abstractOutboundGroup.(*group.URLTest)
- if isURLTest {
- go urlTest.CheckOutbounds()
- } else {
- historyStorage := boxService.urlTestHistoryStorage
- outbounds := common.Filter(common.Map(outboundGroup.All(), func(it string) adapter.Outbound {
- itOutbound, _ := boxService.instance.Outbound().Outbound(it)
- return itOutbound
- }), func(it adapter.Outbound) bool {
- if it == nil {
- return false
- }
- _, isGroup := it.(adapter.OutboundGroup)
- if isGroup {
- return false
- }
- return true
- })
- b, _ := batch.New(boxService.ctx, batch.WithConcurrencyNum[any](10))
- for _, detour := range outbounds {
- outboundToTest := detour
- outboundTag := outboundToTest.Tag()
- b.Go(outboundTag, func() (any, error) {
- t, err := urltest.URLTest(boxService.ctx, "", outboundToTest)
- if err != nil {
- historyStorage.DeleteURLTestHistory(outboundTag)
- } else {
- historyStorage.StoreURLTestHistory(outboundTag, &adapter.URLTestHistory{
- Time: time.Now(),
- Delay: t,
- })
- }
- return nil, nil
- })
- }
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) SelectOutbound(ctx context.Context, request *SelectOutboundRequest) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- switch s.serviceStatus.Status {
- case ServiceStatus_STARTING, ServiceStatus_STARTED:
- default:
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- boxService := s.instance.instance
- s.serviceAccess.RUnlock()
- outboundGroup, isLoaded := boxService.Outbound().Outbound(request.GroupTag)
- if !isLoaded {
- return nil, E.New("selector not found: ", request.GroupTag)
- }
- selector, isSelector := outboundGroup.(*group.Selector)
- if !isSelector {
- return nil, E.New("outbound is not a selector: ", request.GroupTag)
- }
- if !selector.SelectOutbound(request.OutboundTag) {
- return nil, E.New("outbound not found in selector: ", request.OutboundTag)
- }
- s.urlTestObserver.Emit(struct{}{})
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) SetGroupExpand(ctx context.Context, request *SetGroupExpandRequest) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- switch s.serviceStatus.Status {
- case ServiceStatus_STARTING, ServiceStatus_STARTED:
- default:
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- boxService := s.instance
- s.serviceAccess.RUnlock()
- if boxService.cacheFile != nil {
- err := boxService.cacheFile.StoreGroupExpand(request.GroupTag, request.IsExpand)
- if err != nil {
- return nil, err
- }
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) GetSystemProxyStatus(ctx context.Context, empty *emptypb.Empty) (*SystemProxyStatus, error) {
- return s.handler.SystemProxyStatus()
- }
- func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *SetSystemProxyEnabledRequest) (*emptypb.Empty, error) {
- err := s.handler.SetSystemProxyEnabled(request.Enabled)
- if err != nil {
- return nil, err
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) TriggerDebugCrash(ctx context.Context, request *DebugCrashRequest) (*emptypb.Empty, error) {
- if !s.debug {
- return nil, status.Error(codes.PermissionDenied, "debug crash trigger unavailable")
- }
- if request == nil {
- return nil, status.Error(codes.InvalidArgument, "missing debug crash request")
- }
- switch request.Type {
- case DebugCrashRequest_GO:
- time.AfterFunc(200*time.Millisecond, func() {
- *(*int)(unsafe.Pointer(uintptr(0))) = 0
- })
- case DebugCrashRequest_NATIVE:
- err := s.handler.TriggerNativeCrash()
- if err != nil {
- return nil, err
- }
- default:
- return nil, status.Error(codes.InvalidArgument, "unknown debug crash type")
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) TriggerOOMReport(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
- instance := s.Instance()
- if instance == nil {
- return nil, status.Error(codes.FailedPrecondition, "service not started")
- }
- reporter := service.FromContext[oomkiller.OOMReporter](instance.ctx)
- if reporter == nil {
- return nil, status.Error(codes.Unavailable, "OOM reporter not available")
- }
- return &emptypb.Empty{}, reporter.WriteReport(memory.Total())
- }
- func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[ConnectionEvents]) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- s.serviceAccess.RLock()
- boxService := s.instance
- s.serviceAccess.RUnlock()
- if boxService.clashServer == nil {
- return E.New("clash server not available")
- }
- trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
- subscription, done, err := s.connectionEventObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.connectionEventObserver.UnSubscribe(subscription)
- connectionSnapshots := make(map[uuid.UUID]connectionSnapshot)
- initialEvents := s.buildInitialConnectionState(trafficManager, connectionSnapshots)
- err = server.Send(&ConnectionEvents{
- Events: initialEvents,
- Reset_: true,
- })
- if err != nil {
- return err
- }
- interval := time.Duration(request.Interval)
- if interval <= 0 {
- interval = time.Second
- }
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for {
- select {
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case <-done:
- return nil
- case event := <-subscription:
- var pendingEvents []*ConnectionEvent
- if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
- pendingEvents = append(pendingEvents, protoEvent)
- }
- drain:
- for {
- select {
- case event = <-subscription:
- if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
- pendingEvents = append(pendingEvents, protoEvent)
- }
- default:
- break drain
- }
- }
- if len(pendingEvents) > 0 {
- err = server.Send(&ConnectionEvents{Events: pendingEvents})
- if err != nil {
- return err
- }
- }
- case <-ticker.C:
- protoEvents := s.buildTrafficUpdates(trafficManager, connectionSnapshots)
- if len(protoEvents) == 0 {
- continue
- }
- err = server.Send(&ConnectionEvents{Events: protoEvents})
- if err != nil {
- return err
- }
- }
- }
- }
- type connectionSnapshot struct {
- uplink int64
- downlink int64
- hadTraffic bool
- }
- func (s *StartedService) buildInitialConnectionState(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
- var events []*ConnectionEvent
- for _, metadata := range manager.Connections() {
- events = append(events, &ConnectionEvent{
- Type: ConnectionEventType_CONNECTION_EVENT_NEW,
- Id: metadata.ID.String(),
- Connection: buildConnectionProto(metadata),
- })
- snapshots[metadata.ID] = connectionSnapshot{
- uplink: metadata.Upload.Load(),
- downlink: metadata.Download.Load(),
- }
- }
- for _, metadata := range manager.ClosedConnections() {
- conn := buildConnectionProto(metadata)
- conn.ClosedAt = metadata.ClosedAt.UnixMilli()
- events = append(events, &ConnectionEvent{
- Type: ConnectionEventType_CONNECTION_EVENT_NEW,
- Id: metadata.ID.String(),
- Connection: conn,
- })
- }
- return events
- }
- func (s *StartedService) applyConnectionEvent(event trafficontrol.ConnectionEvent, snapshots map[uuid.UUID]connectionSnapshot) *ConnectionEvent {
- switch event.Type {
- case trafficontrol.ConnectionEventNew:
- if _, exists := snapshots[event.ID]; exists {
- return nil
- }
- snapshots[event.ID] = connectionSnapshot{
- uplink: event.Metadata.Upload.Load(),
- downlink: event.Metadata.Download.Load(),
- }
- return &ConnectionEvent{
- Type: ConnectionEventType_CONNECTION_EVENT_NEW,
- Id: event.ID.String(),
- Connection: buildConnectionProto(event.Metadata),
- }
- case trafficontrol.ConnectionEventClosed:
- delete(snapshots, event.ID)
- protoEvent := &ConnectionEvent{
- Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
- Id: event.ID.String(),
- }
- closedAt := event.ClosedAt
- if closedAt.IsZero() && !event.Metadata.ClosedAt.IsZero() {
- closedAt = event.Metadata.ClosedAt
- }
- if closedAt.IsZero() {
- closedAt = time.Now()
- }
- protoEvent.ClosedAt = closedAt.UnixMilli()
- if event.Metadata.ID != uuid.Nil {
- conn := buildConnectionProto(event.Metadata)
- conn.ClosedAt = protoEvent.ClosedAt
- protoEvent.Connection = conn
- }
- return protoEvent
- default:
- return nil
- }
- }
- func (s *StartedService) buildTrafficUpdates(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
- activeConnections := manager.Connections()
- activeIndex := make(map[uuid.UUID]*trafficontrol.TrackerMetadata, len(activeConnections))
- var events []*ConnectionEvent
- for _, metadata := range activeConnections {
- activeIndex[metadata.ID] = metadata
- currentUpload := metadata.Upload.Load()
- currentDownload := metadata.Download.Load()
- snapshot, exists := snapshots[metadata.ID]
- if !exists {
- snapshots[metadata.ID] = connectionSnapshot{
- uplink: currentUpload,
- downlink: currentDownload,
- }
- events = append(events, &ConnectionEvent{
- Type: ConnectionEventType_CONNECTION_EVENT_NEW,
- Id: metadata.ID.String(),
- Connection: buildConnectionProto(metadata),
- })
- continue
- }
- uplinkDelta := currentUpload - snapshot.uplink
- downlinkDelta := currentDownload - snapshot.downlink
- if uplinkDelta < 0 || downlinkDelta < 0 {
- if snapshot.hadTraffic {
- events = append(events, &ConnectionEvent{
- Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
- Id: metadata.ID.String(),
- UplinkDelta: 0,
- DownlinkDelta: 0,
- })
- }
- snapshot.uplink = currentUpload
- snapshot.downlink = currentDownload
- snapshot.hadTraffic = false
- snapshots[metadata.ID] = snapshot
- continue
- }
- if uplinkDelta > 0 || downlinkDelta > 0 {
- snapshot.uplink = currentUpload
- snapshot.downlink = currentDownload
- snapshot.hadTraffic = true
- snapshots[metadata.ID] = snapshot
- events = append(events, &ConnectionEvent{
- Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
- Id: metadata.ID.String(),
- UplinkDelta: uplinkDelta,
- DownlinkDelta: downlinkDelta,
- })
- continue
- }
- if snapshot.hadTraffic {
- snapshot.uplink = currentUpload
- snapshot.downlink = currentDownload
- snapshot.hadTraffic = false
- snapshots[metadata.ID] = snapshot
- events = append(events, &ConnectionEvent{
- Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
- Id: metadata.ID.String(),
- UplinkDelta: 0,
- DownlinkDelta: 0,
- })
- }
- }
- var closedIndex map[uuid.UUID]*trafficontrol.TrackerMetadata
- for id := range snapshots {
- if _, exists := activeIndex[id]; exists {
- continue
- }
- if closedIndex == nil {
- closedIndex = make(map[uuid.UUID]*trafficontrol.TrackerMetadata)
- for _, metadata := range manager.ClosedConnections() {
- closedIndex[metadata.ID] = metadata
- }
- }
- closedAt := time.Now()
- var conn *Connection
- if metadata, ok := closedIndex[id]; ok {
- if !metadata.ClosedAt.IsZero() {
- closedAt = metadata.ClosedAt
- }
- conn = buildConnectionProto(metadata)
- conn.ClosedAt = closedAt.UnixMilli()
- }
- events = append(events, &ConnectionEvent{
- Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
- Id: id.String(),
- ClosedAt: closedAt.UnixMilli(),
- Connection: conn,
- })
- delete(snapshots, id)
- }
- return events
- }
- func buildConnectionProto(metadata *trafficontrol.TrackerMetadata) *Connection {
- var rule string
- if metadata.Rule != nil {
- rule = metadata.Rule.String()
- }
- uplinkTotal := metadata.Upload.Load()
- downlinkTotal := metadata.Download.Load()
- var processInfo *ProcessInfo
- if metadata.Metadata.ProcessInfo != nil {
- processInfo = &ProcessInfo{
- ProcessId: metadata.Metadata.ProcessInfo.ProcessID,
- UserId: metadata.Metadata.ProcessInfo.UserId,
- UserName: metadata.Metadata.ProcessInfo.UserName,
- ProcessPath: metadata.Metadata.ProcessInfo.ProcessPath,
- PackageNames: metadata.Metadata.ProcessInfo.AndroidPackageNames,
- }
- }
- return &Connection{
- Id: metadata.ID.String(),
- Inbound: metadata.Metadata.Inbound,
- InboundType: metadata.Metadata.InboundType,
- IpVersion: int32(metadata.Metadata.IPVersion),
- Network: metadata.Metadata.Network,
- Source: metadata.Metadata.Source.String(),
- Destination: metadata.Metadata.Destination.String(),
- Domain: metadata.Metadata.Domain,
- Protocol: metadata.Metadata.Protocol,
- User: metadata.Metadata.User,
- FromOutbound: metadata.Metadata.Outbound,
- CreatedAt: metadata.CreatedAt.UnixMilli(),
- UplinkTotal: uplinkTotal,
- DownlinkTotal: downlinkTotal,
- Rule: rule,
- Outbound: metadata.Outbound,
- OutboundType: metadata.OutboundType,
- ChainList: metadata.Chain,
- ProcessInfo: processInfo,
- }
- }
- func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- switch s.serviceStatus.Status {
- case ServiceStatus_STARTING, ServiceStatus_STARTED:
- default:
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- boxService := s.instance
- s.serviceAccess.RUnlock()
- targetConn := boxService.clashServer.(*clashapi.Server).TrafficManager().Connection(uuid.FromStringOrNil(request.Id))
- if targetConn != nil {
- targetConn.Close()
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) CloseAllConnections(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- nowService := s.instance
- s.serviceAccess.RUnlock()
- if nowService != nil && nowService.connectionManager != nil {
- nowService.connectionManager.CloseAll()
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) GetDeprecatedWarnings(ctx context.Context, empty *emptypb.Empty) (*DeprecatedWarnings, error) {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- boxService := s.instance
- s.serviceAccess.RUnlock()
- notes := service.FromContext[deprecated.Manager](boxService.ctx).(*deprecatedManager).Get()
- return &DeprecatedWarnings{
- Warnings: common.Map(notes, func(it deprecated.Note) *DeprecatedWarning {
- return &DeprecatedWarning{
- Message: it.Message(),
- Impending: it.Impending(),
- MigrationLink: it.MigrationLink,
- Description: it.Description,
- DeprecatedVersion: it.DeprecatedVersion,
- ScheduledVersion: it.ScheduledVersion,
- }
- }),
- }, nil
- }
- func (s *StartedService) GetStartedAt(ctx context.Context, empty *emptypb.Empty) (*StartedAt, error) {
- s.serviceAccess.RLock()
- defer s.serviceAccess.RUnlock()
- return &StartedAt{StartedAt: s.startedAt.UnixMilli()}, nil
- }
- func (s *StartedService) SubscribeOutbounds(_ *emptypb.Empty, server grpc.ServerStreamingServer[OutboundList]) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- subscription, done, err := s.urlTestObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.urlTestObserver.UnSubscribe(subscription)
- for {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return os.ErrInvalid
- }
- boxService := s.instance
- s.serviceAccess.RUnlock()
- historyStorage := boxService.urlTestHistoryStorage
- var list OutboundList
- for _, ob := range boxService.instance.Outbound().Outbounds() {
- item := &GroupItem{
- Tag: ob.Tag(),
- Type: ob.Type(),
- }
- if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(ob)); history != nil {
- item.UrlTestTime = history.Time.Unix()
- item.UrlTestDelay = int32(history.Delay)
- }
- list.Outbounds = append(list.Outbounds, item)
- }
- for _, ep := range boxService.instance.Endpoint().Endpoints() {
- item := &GroupItem{
- Tag: ep.Tag(),
- Type: ep.Type(),
- }
- if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(ep)); history != nil {
- item.UrlTestTime = history.Time.Unix()
- item.UrlTestDelay = int32(history.Delay)
- }
- list.Outbounds = append(list.Outbounds, item)
- }
- err = server.Send(&list)
- if err != nil {
- return err
- }
- select {
- case <-subscription:
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case <-done:
- return nil
- }
- }
- }
- func resolveOutbound(instance *Instance, tag string) (adapter.Outbound, error) {
- if tag == "" {
- return instance.instance.Outbound().Default(), nil
- }
- outbound, loaded := instance.instance.Outbound().Outbound(tag)
- if !loaded {
- return nil, E.New("outbound not found: ", tag)
- }
- return outbound, nil
- }
- func (s *StartedService) StartNetworkQualityTest(
- request *NetworkQualityTestRequest,
- server grpc.ServerStreamingServer[NetworkQualityTestProgress],
- ) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- s.serviceAccess.RLock()
- boxService := s.instance
- s.serviceAccess.RUnlock()
- outbound, err := resolveOutbound(boxService, request.OutboundTag)
- if err != nil {
- return err
- }
- resolvedDialer := dialer.NewResolveDialer(boxService.ctx, outbound, true, "", adapter.DNSQueryOptions{}, 0)
- httpClient := networkquality.NewHTTPClient(resolvedDialer)
- defer httpClient.CloseIdleConnections()
- measurementClientFactory, err := networkquality.NewOptionalHTTP3Factory(resolvedDialer, request.Http3)
- if err != nil {
- return err
- }
- result, nqErr := networkquality.Run(networkquality.Options{
- ConfigURL: request.ConfigURL,
- HTTPClient: httpClient,
- NewMeasurementClient: measurementClientFactory,
- Serial: request.Serial,
- MaxRuntime: time.Duration(request.MaxRuntimeSeconds) * time.Second,
- Context: server.Context(),
- OnProgress: func(p networkquality.Progress) {
- _ = server.Send(&NetworkQualityTestProgress{
- Phase: int32(p.Phase),
- DownloadCapacity: p.DownloadCapacity,
- UploadCapacity: p.UploadCapacity,
- DownloadRPM: p.DownloadRPM,
- UploadRPM: p.UploadRPM,
- IdleLatencyMs: p.IdleLatencyMs,
- ElapsedMs: p.ElapsedMs,
- DownloadCapacityAccuracy: int32(p.DownloadCapacityAccuracy),
- UploadCapacityAccuracy: int32(p.UploadCapacityAccuracy),
- DownloadRPMAccuracy: int32(p.DownloadRPMAccuracy),
- UploadRPMAccuracy: int32(p.UploadRPMAccuracy),
- })
- },
- })
- if nqErr != nil {
- return server.Send(&NetworkQualityTestProgress{
- IsFinal: true,
- Error: nqErr.Error(),
- })
- }
- return server.Send(&NetworkQualityTestProgress{
- Phase: int32(networkquality.PhaseDone),
- DownloadCapacity: result.DownloadCapacity,
- UploadCapacity: result.UploadCapacity,
- DownloadRPM: result.DownloadRPM,
- UploadRPM: result.UploadRPM,
- IdleLatencyMs: result.IdleLatencyMs,
- IsFinal: true,
- DownloadCapacityAccuracy: int32(result.DownloadCapacityAccuracy),
- UploadCapacityAccuracy: int32(result.UploadCapacityAccuracy),
- DownloadRPMAccuracy: int32(result.DownloadRPMAccuracy),
- UploadRPMAccuracy: int32(result.UploadRPMAccuracy),
- })
- }
- func (s *StartedService) StartSTUNTest(
- request *STUNTestRequest,
- server grpc.ServerStreamingServer[STUNTestProgress],
- ) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- s.serviceAccess.RLock()
- boxService := s.instance
- s.serviceAccess.RUnlock()
- outbound, err := resolveOutbound(boxService, request.OutboundTag)
- if err != nil {
- return err
- }
- resolvedDialer := dialer.NewResolveDialer(boxService.ctx, outbound, true, "", adapter.DNSQueryOptions{}, 0)
- result, stunErr := stun.Run(stun.Options{
- Server: request.Server,
- Dialer: resolvedDialer,
- Context: server.Context(),
- OnProgress: func(p stun.Progress) {
- _ = server.Send(&STUNTestProgress{
- Phase: int32(p.Phase),
- ExternalAddr: p.ExternalAddr,
- LatencyMs: p.LatencyMs,
- NatMapping: int32(p.NATMapping),
- NatFiltering: int32(p.NATFiltering),
- })
- },
- })
- if stunErr != nil {
- return server.Send(&STUNTestProgress{
- IsFinal: true,
- Error: stunErr.Error(),
- })
- }
- return server.Send(&STUNTestProgress{
- Phase: int32(stun.PhaseDone),
- ExternalAddr: result.ExternalAddr,
- LatencyMs: result.LatencyMs,
- NatMapping: int32(result.NATMapping),
- NatFiltering: int32(result.NATFiltering),
- IsFinal: true,
- NatTypeSupported: result.NATTypeSupported,
- })
- }
- func (s *StartedService) SubscribeTailscaleStatus(
- _ *emptypb.Empty,
- server grpc.ServerStreamingServer[TailscaleStatusUpdate],
- ) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- s.serviceAccess.RLock()
- boxService := s.instance
- s.serviceAccess.RUnlock()
- endpointManager := service.FromContext[adapter.EndpointManager](boxService.ctx)
- if endpointManager == nil {
- return status.Error(codes.FailedPrecondition, "endpoint manager not available")
- }
- type tailscaleEndpoint struct {
- tag string
- provider adapter.TailscaleEndpoint
- }
- var endpoints []tailscaleEndpoint
- for _, endpoint := range endpointManager.Endpoints() {
- if endpoint.Type() != C.TypeTailscale {
- continue
- }
- provider, loaded := endpoint.(adapter.TailscaleEndpoint)
- if !loaded {
- continue
- }
- endpoints = append(endpoints, tailscaleEndpoint{
- tag: endpoint.Tag(),
- provider: provider,
- })
- }
- if len(endpoints) == 0 {
- return status.Error(codes.NotFound, "no Tailscale endpoint found")
- }
- type taggedStatus struct {
- tag string
- status *adapter.TailscaleEndpointStatus
- }
- updates := make(chan taggedStatus, len(endpoints))
- ctx, cancel := context.WithCancel(server.Context())
- defer cancel()
- var waitGroup sync.WaitGroup
- for _, endpoint := range endpoints {
- waitGroup.Add(1)
- go func(tag string, provider adapter.TailscaleEndpoint) {
- defer waitGroup.Done()
- _ = provider.SubscribeTailscaleStatus(ctx, func(endpointStatus *adapter.TailscaleEndpointStatus) {
- select {
- case updates <- taggedStatus{tag: tag, status: endpointStatus}:
- case <-ctx.Done():
- }
- })
- }(endpoint.tag, endpoint.provider)
- }
- go func() {
- waitGroup.Wait()
- close(updates)
- }()
- var tags []string
- statuses := make(map[string]*adapter.TailscaleEndpointStatus, len(endpoints))
- for update := range updates {
- if _, exists := statuses[update.tag]; !exists {
- tags = append(tags, update.tag)
- }
- statuses[update.tag] = update.status
- protoEndpoints := make([]*TailscaleEndpointStatus, 0, len(statuses))
- for _, tag := range tags {
- protoEndpoints = append(protoEndpoints, tailscaleEndpointStatusToProto(tag, statuses[tag]))
- }
- sendErr := server.Send(&TailscaleStatusUpdate{
- Endpoints: protoEndpoints,
- })
- if sendErr != nil {
- return sendErr
- }
- }
- return nil
- }
- func tailscaleEndpointStatusToProto(tag string, s *adapter.TailscaleEndpointStatus) *TailscaleEndpointStatus {
- userGroups := make([]*TailscaleUserGroup, len(s.UserGroups))
- for i, group := range s.UserGroups {
- peers := make([]*TailscalePeer, len(group.Peers))
- for j, peer := range group.Peers {
- peers[j] = tailscalePeerToProto(peer)
- }
- userGroups[i] = &TailscaleUserGroup{
- UserID: group.UserID,
- LoginName: group.LoginName,
- DisplayName: group.DisplayName,
- ProfilePicURL: group.ProfilePicURL,
- Peers: peers,
- }
- }
- result := &TailscaleEndpointStatus{
- EndpointTag: tag,
- BackendState: s.BackendState,
- AuthURL: s.AuthURL,
- NetworkName: s.NetworkName,
- MagicDNSSuffix: s.MagicDNSSuffix,
- UserGroups: userGroups,
- }
- if s.Self != nil {
- result.Self = tailscalePeerToProto(s.Self)
- }
- return result
- }
- func tailscalePeerToProto(peer *adapter.TailscalePeer) *TailscalePeer {
- return &TailscalePeer{
- HostName: peer.HostName,
- DnsName: peer.DNSName,
- Os: peer.OS,
- TailscaleIPs: peer.TailscaleIPs,
- Online: peer.Online,
- ExitNode: peer.ExitNode,
- ExitNodeOption: peer.ExitNodeOption,
- Active: peer.Active,
- RxBytes: peer.RxBytes,
- TxBytes: peer.TxBytes,
- KeyExpiry: peer.KeyExpiry,
- }
- }
- func (s *StartedService) StartTailscalePing(
- request *TailscalePingRequest,
- server grpc.ServerStreamingServer[TailscalePingResponse],
- ) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- s.serviceAccess.RLock()
- boxService := s.instance
- s.serviceAccess.RUnlock()
- endpointManager := service.FromContext[adapter.EndpointManager](boxService.ctx)
- if endpointManager == nil {
- return status.Error(codes.FailedPrecondition, "endpoint manager not available")
- }
- var provider adapter.TailscaleEndpoint
- if request.EndpointTag != "" {
- endpoint, loaded := endpointManager.Get(request.EndpointTag)
- if !loaded {
- return status.Error(codes.NotFound, "endpoint not found: "+request.EndpointTag)
- }
- if endpoint.Type() != C.TypeTailscale {
- return status.Error(codes.InvalidArgument, "endpoint is not Tailscale: "+request.EndpointTag)
- }
- pingProvider, loaded := endpoint.(adapter.TailscaleEndpoint)
- if !loaded {
- return status.Error(codes.FailedPrecondition, "endpoint does not support ping")
- }
- provider = pingProvider
- } else {
- for _, endpoint := range endpointManager.Endpoints() {
- if endpoint.Type() != C.TypeTailscale {
- continue
- }
- pingProvider, loaded := endpoint.(adapter.TailscaleEndpoint)
- if loaded {
- provider = pingProvider
- break
- }
- }
- if provider == nil {
- return status.Error(codes.NotFound, "no Tailscale endpoint found")
- }
- }
- return provider.StartTailscalePing(server.Context(), request.PeerIP, func(result *adapter.TailscalePingResult) {
- _ = server.Send(&TailscalePingResponse{
- LatencyMs: result.LatencyMs,
- IsDirect: result.IsDirect,
- Endpoint: result.Endpoint,
- DerpRegionID: result.DERPRegionID,
- DerpRegionCode: result.DERPRegionCode,
- Error: result.Error,
- })
- })
- }
- func (s *StartedService) mustEmbedUnimplementedStartedServiceServer() {
- }
- func (s *StartedService) WriteMessage(level log.Level, message string) {
- item := &log.Entry{Level: level, Message: message}
- s.logAccess.Lock()
- s.logLines.PushBack(item)
- if s.logLines.Len() > s.logMaxLines {
- s.logLines.Remove(s.logLines.Front())
- }
- s.logAccess.Unlock()
- s.logSubscriber.Emit(item)
- if s.debug {
- s.handler.WriteDebugMessage(message)
- }
- }
- func (s *StartedService) Instance() *Instance {
- s.serviceAccess.RLock()
- defer s.serviceAccess.RUnlock()
- return s.instance
- }
|