123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826 |
- package daemon
- import (
- "context"
- "os"
- "runtime"
- "sync"
- "time"
- "github.com/sagernet/sing-box/adapter"
- "github.com/sagernet/sing-box/common/conntrack"
- "github.com/sagernet/sing-box/common/urltest"
- "github.com/sagernet/sing-box/experimental/clashapi"
- "github.com/sagernet/sing-box/experimental/clashapi/trafficontrol"
- "github.com/sagernet/sing-box/experimental/deprecated"
- "github.com/sagernet/sing-box/log"
- "github.com/sagernet/sing-box/protocol/group"
- "github.com/sagernet/sing/common"
- "github.com/sagernet/sing/common/batch"
- E "github.com/sagernet/sing/common/exceptions"
- "github.com/sagernet/sing/common/memory"
- "github.com/sagernet/sing/common/observable"
- "github.com/sagernet/sing/common/x/list"
- "github.com/sagernet/sing/service"
- "github.com/gofrs/uuid/v5"
- "google.golang.org/grpc"
- "google.golang.org/protobuf/types/known/emptypb"
- )
- var _ StartedServiceServer = (*StartedService)(nil)
- type StartedService struct {
- ctx context.Context
- // platform adapter.PlatformInterface
- handler PlatformHandler
- debug bool
- logMaxLines int
- // workingDirectory string
- // tempDirectory string
- // userID int
- // groupID int
- // systemProxyEnabled bool
- serviceAccess sync.RWMutex
- serviceStatus *ServiceStatus
- serviceStatusSubscriber *observable.Subscriber[*ServiceStatus]
- serviceStatusObserver *observable.Observer[*ServiceStatus]
- logAccess sync.RWMutex
- logLines list.List[*log.Entry]
- logSubscriber *observable.Subscriber[*log.Entry]
- logObserver *observable.Observer[*log.Entry]
- instance *Instance
- urlTestSubscriber *observable.Subscriber[struct{}]
- urlTestObserver *observable.Observer[struct{}]
- urlTestHistoryStorage *urltest.HistoryStorage
- clashModeSubscriber *observable.Subscriber[struct{}]
- clashModeObserver *observable.Observer[struct{}]
- }
- type ServiceOptions struct {
- Context context.Context
- // Platform adapter.PlatformInterface
- Handler PlatformHandler
- Debug bool
- LogMaxLines int
- // WorkingDirectory string
- // TempDirectory string
- // UserID int
- // GroupID int
- // SystemProxyEnabled bool
- }
- func NewStartedService(options ServiceOptions) *StartedService {
- s := &StartedService{
- ctx: options.Context,
- // platform: options.Platform,
- handler: options.Handler,
- debug: options.Debug,
- logMaxLines: options.LogMaxLines,
- // workingDirectory: options.WorkingDirectory,
- // tempDirectory: options.TempDirectory,
- // userID: options.UserID,
- // groupID: options.GroupID,
- // systemProxyEnabled: options.SystemProxyEnabled,
- serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
- serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
- logSubscriber: observable.NewSubscriber[*log.Entry](128),
- urlTestSubscriber: observable.NewSubscriber[struct{}](1),
- urlTestHistoryStorage: urltest.NewHistoryStorage(),
- clashModeSubscriber: observable.NewSubscriber[struct{}](1),
- }
- s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
- s.logObserver = observable.NewObserver(s.logSubscriber, 64)
- s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
- s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
- return s
- }
- func (s *StartedService) resetLogs() {
- s.logAccess.Lock()
- s.logLines = list.List[*log.Entry]{}
- s.logAccess.Unlock()
- s.logSubscriber.Emit(nil)
- }
- func (s *StartedService) updateStatus(newStatus ServiceStatus_Type) {
- statusObject := &ServiceStatus{Status: newStatus}
- s.serviceStatusSubscriber.Emit(statusObject)
- s.serviceStatus = statusObject
- }
- func (s *StartedService) updateStatusError(err error) error {
- statusObject := &ServiceStatus{Status: ServiceStatus_FATAL, ErrorMessage: err.Error()}
- s.serviceStatusSubscriber.Emit(statusObject)
- s.serviceStatus = statusObject
- s.serviceAccess.Unlock()
- return err
- }
- func (s *StartedService) waitForStarted(ctx context.Context) error {
- s.serviceAccess.RLock()
- currentStatus := s.serviceStatus.Status
- s.serviceAccess.RUnlock()
- switch currentStatus {
- case ServiceStatus_STARTED:
- return nil
- case ServiceStatus_STARTING:
- default:
- return os.ErrInvalid
- }
- subscription, done, err := s.serviceStatusObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.serviceStatusObserver.UnSubscribe(subscription)
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-s.ctx.Done():
- return s.ctx.Err()
- case status := <-subscription:
- switch status.Status {
- case ServiceStatus_STARTED:
- return nil
- case ServiceStatus_FATAL:
- return E.New(status.ErrorMessage)
- case ServiceStatus_IDLE, ServiceStatus_STOPPING:
- return os.ErrInvalid
- }
- case <-done:
- return os.ErrClosed
- }
- }
- }
- func (s *StartedService) StartOrReloadService(profileContent string, options *OverrideOptions) error {
- s.serviceAccess.Lock()
- switch s.serviceStatus.Status {
- case ServiceStatus_IDLE, ServiceStatus_STARTED, ServiceStatus_STARTING:
- default:
- s.serviceAccess.Unlock()
- return os.ErrInvalid
- }
- oldInstance := s.instance
- if oldInstance != nil {
- s.updateStatus(ServiceStatus_STOPPING)
- s.serviceAccess.Unlock()
- _ = oldInstance.Close()
- s.serviceAccess.Lock()
- }
- s.updateStatus(ServiceStatus_STARTING)
- s.resetLogs()
- instance, err := s.newInstance(profileContent, options)
- if err != nil {
- return s.updateStatusError(err)
- }
- s.instance = instance
- instance.urlTestHistoryStorage.SetHook(s.urlTestSubscriber)
- if instance.clashServer != nil {
- instance.clashServer.SetModeUpdateHook(s.clashModeSubscriber)
- }
- s.serviceAccess.Unlock()
- err = instance.Start()
- s.serviceAccess.Lock()
- if s.serviceStatus.Status != ServiceStatus_STARTING {
- s.serviceAccess.Unlock()
- return nil
- }
- if err != nil {
- return s.updateStatusError(err)
- }
- s.updateStatus(ServiceStatus_STARTED)
- s.serviceAccess.Unlock()
- runtime.GC()
- return nil
- }
- func (s *StartedService) CloseService() error {
- s.serviceAccess.Lock()
- switch s.serviceStatus.Status {
- case ServiceStatus_STARTING, ServiceStatus_STARTED:
- default:
- s.serviceAccess.Unlock()
- return os.ErrInvalid
- }
- s.updateStatus(ServiceStatus_STOPPING)
- if s.instance != nil {
- err := s.instance.Close()
- if err != nil {
- return s.updateStatusError(err)
- }
- }
- s.instance = nil
- s.updateStatus(ServiceStatus_IDLE)
- s.serviceAccess.Unlock()
- runtime.GC()
- return nil
- }
- func (s *StartedService) SetError(err error) {
- s.serviceAccess.Lock()
- s.updateStatusError(err)
- s.WriteMessage(log.LevelError, err.Error())
- }
- func (s *StartedService) StopService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
- err := s.handler.ServiceStop()
- if err != nil {
- return nil, err
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) ReloadService(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
- err := s.handler.ServiceReload()
- if err != nil {
- return nil, err
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) SubscribeServiceStatus(empty *emptypb.Empty, server grpc.ServerStreamingServer[ServiceStatus]) error {
- subscription, done, err := s.serviceStatusObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.serviceStatusObserver.UnSubscribe(subscription)
- err = server.Send(s.serviceStatus)
- if err != nil {
- return err
- }
- for {
- select {
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case newStatus := <-subscription:
- err = server.Send(newStatus)
- if err != nil {
- return err
- }
- case <-done:
- return nil
- }
- }
- }
- func (s *StartedService) SubscribeLog(empty *emptypb.Empty, server grpc.ServerStreamingServer[Log]) error {
- var savedLines []*log.Entry
- s.logAccess.Lock()
- savedLines = make([]*log.Entry, 0, s.logLines.Len())
- for element := s.logLines.Front(); element != nil; element = element.Next() {
- savedLines = append(savedLines, element.Value)
- }
- s.logAccess.Unlock()
- subscription, done, err := s.logObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.logObserver.UnSubscribe(subscription)
- err = server.Send(&Log{
- Messages: common.Map(savedLines, func(it *log.Entry) *Log_Message {
- return &Log_Message{
- Level: LogLevel(it.Level),
- Message: it.Message,
- }
- }),
- Reset_: true,
- })
- if err != nil {
- return err
- }
- for {
- select {
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case message := <-subscription:
- if message == nil {
- err = server.Send(&Log{Reset_: true})
- if err != nil {
- return err
- }
- continue
- }
- messages := []*Log_Message{{
- Level: LogLevel(message.Level),
- Message: message.Message,
- }}
- fetch:
- for {
- select {
- case message = <-subscription:
- messages = append(messages, &Log_Message{
- Level: LogLevel(message.Level),
- Message: message.Message,
- })
- default:
- break fetch
- }
- }
- err = server.Send(&Log{Messages: messages})
- if err != nil {
- return err
- }
- case <-done:
- return nil
- }
- }
- }
- func (s *StartedService) GetDefaultLogLevel(ctx context.Context, empty *emptypb.Empty) (*DefaultLogLevel, error) {
- s.serviceAccess.RLock()
- switch s.serviceStatus.Status {
- case ServiceStatus_STARTING, ServiceStatus_STARTED:
- default:
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- logLevel := s.instance.instance.LogFactory().Level()
- s.serviceAccess.RUnlock()
- return &DefaultLogLevel{Level: LogLevel(logLevel)}, nil
- }
- func (s *StartedService) SubscribeStatus(request *SubscribeStatusRequest, server grpc.ServerStreamingServer[Status]) error {
- interval := time.Duration(request.Interval)
- if interval <= 0 {
- interval = time.Second // Default to 1 second
- }
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- status := s.readStatus()
- uploadTotal := status.UplinkTotal
- downloadTotal := status.DownlinkTotal
- for {
- err := server.Send(status)
- if err != nil {
- return err
- }
- select {
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case <-ticker.C:
- }
- status = s.readStatus()
- upload := status.UplinkTotal - uploadTotal
- download := status.DownlinkTotal - downloadTotal
- uploadTotal = status.UplinkTotal
- downloadTotal = status.DownlinkTotal
- status.Uplink = upload
- status.Downlink = download
- }
- }
- func (s *StartedService) readStatus() *Status {
- var status Status
- status.Memory = memory.Inuse()
- status.Goroutines = int32(runtime.NumGoroutine())
- status.ConnectionsOut = int32(conntrack.Count())
- s.serviceAccess.RLock()
- nowService := s.instance
- s.serviceAccess.RUnlock()
- if nowService != nil {
- if clashServer := nowService.clashServer; clashServer != nil {
- status.TrafficAvailable = true
- trafficManager := clashServer.(*clashapi.Server).TrafficManager()
- status.UplinkTotal, status.DownlinkTotal = trafficManager.Total()
- status.ConnectionsIn = int32(trafficManager.ConnectionsLen())
- }
- }
- return &status
- }
- func (s *StartedService) SubscribeGroups(empty *emptypb.Empty, server grpc.ServerStreamingServer[Groups]) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- subscription, done, err := s.urlTestObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.urlTestObserver.UnSubscribe(subscription)
- for {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return os.ErrInvalid
- }
- groups := s.readGroups()
- s.serviceAccess.RUnlock()
- err = server.Send(groups)
- if err != nil {
- return err
- }
- select {
- case <-subscription:
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case <-done:
- return nil
- }
- }
- }
- func (s *StartedService) readGroups() *Groups {
- historyStorage := s.instance.urlTestHistoryStorage
- boxService := s.instance
- outbounds := boxService.instance.Outbound().Outbounds()
- var iGroups []adapter.OutboundGroup
- for _, it := range outbounds {
- if group, isGroup := it.(adapter.OutboundGroup); isGroup {
- iGroups = append(iGroups, group)
- }
- }
- var gs Groups
- for _, iGroup := range iGroups {
- var g Group
- g.Tag = iGroup.Tag()
- g.Type = iGroup.Type()
- _, g.Selectable = iGroup.(*group.Selector)
- g.Selected = iGroup.Now()
- if boxService.cacheFile != nil {
- if isExpand, loaded := boxService.cacheFile.LoadGroupExpand(g.Tag); loaded {
- g.IsExpand = isExpand
- }
- }
- for _, itemTag := range iGroup.All() {
- itemOutbound, isLoaded := boxService.instance.Outbound().Outbound(itemTag)
- if !isLoaded {
- continue
- }
- var item GroupItem
- item.Tag = itemTag
- item.Type = itemOutbound.Type()
- if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(itemOutbound)); history != nil {
- item.UrlTestTime = history.Time.Unix()
- item.UrlTestDelay = int32(history.Delay)
- }
- g.Items = append(g.Items, &item)
- }
- if len(g.Items) < 2 {
- continue
- }
- gs.Group = append(gs.Group, &g)
- }
- return &gs
- }
- func (s *StartedService) GetClashModeStatus(ctx context.Context, empty *emptypb.Empty) (*ClashModeStatus, error) {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- clashServer := s.instance.clashServer
- s.serviceAccess.RUnlock()
- if clashServer == nil {
- return nil, os.ErrInvalid
- }
- return &ClashModeStatus{
- ModeList: clashServer.ModeList(),
- CurrentMode: clashServer.Mode(),
- }, nil
- }
- func (s *StartedService) SubscribeClashMode(empty *emptypb.Empty, server grpc.ServerStreamingServer[ClashMode]) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- subscription, done, err := s.clashModeObserver.Subscribe()
- if err != nil {
- return err
- }
- defer s.clashModeObserver.UnSubscribe(subscription)
- for {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return os.ErrInvalid
- }
- message := &ClashMode{Mode: s.instance.clashServer.Mode()}
- s.serviceAccess.RUnlock()
- err = server.Send(message)
- if err != nil {
- return err
- }
- select {
- case <-subscription:
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case <-done:
- return nil
- }
- }
- }
- func (s *StartedService) SetClashMode(ctx context.Context, request *ClashMode) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- clashServer := s.instance.clashServer
- s.serviceAccess.RUnlock()
- clashServer.(*clashapi.Server).SetMode(request.Mode)
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) URLTest(ctx context.Context, request *URLTestRequest) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- boxService := s.instance
- s.serviceAccess.RUnlock()
- groupTag := request.OutboundTag
- abstractOutboundGroup, isLoaded := boxService.instance.Outbound().Outbound(groupTag)
- if !isLoaded {
- return nil, E.New("outbound group not found: ", groupTag)
- }
- outboundGroup, isOutboundGroup := abstractOutboundGroup.(adapter.OutboundGroup)
- if !isOutboundGroup {
- return nil, E.New("outbound is not a group: ", groupTag)
- }
- urlTest, isURLTest := abstractOutboundGroup.(*group.URLTest)
- if isURLTest {
- go urlTest.CheckOutbounds()
- } else {
- historyStorage := boxService.urlTestHistoryStorage
- outbounds := common.Filter(common.Map(outboundGroup.All(), func(it string) adapter.Outbound {
- itOutbound, _ := boxService.instance.Outbound().Outbound(it)
- return itOutbound
- }), func(it adapter.Outbound) bool {
- if it == nil {
- return false
- }
- _, isGroup := it.(adapter.OutboundGroup)
- if isGroup {
- return false
- }
- return true
- })
- b, _ := batch.New(boxService.ctx, batch.WithConcurrencyNum[any](10))
- for _, detour := range outbounds {
- outboundToTest := detour
- outboundTag := outboundToTest.Tag()
- b.Go(outboundTag, func() (any, error) {
- t, err := urltest.URLTest(boxService.ctx, "", outboundToTest)
- if err != nil {
- historyStorage.DeleteURLTestHistory(outboundTag)
- } else {
- historyStorage.StoreURLTestHistory(outboundTag, &adapter.URLTestHistory{
- Time: time.Now(),
- Delay: t,
- })
- }
- return nil, nil
- })
- }
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) SelectOutbound(ctx context.Context, request *SelectOutboundRequest) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- switch s.serviceStatus.Status {
- case ServiceStatus_STARTING, ServiceStatus_STARTED:
- default:
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- boxService := s.instance.instance
- s.serviceAccess.RUnlock()
- outboundGroup, isLoaded := boxService.Outbound().Outbound(request.GroupTag)
- if !isLoaded {
- return nil, E.New("selector not found: ", request.GroupTag)
- }
- selector, isSelector := outboundGroup.(*group.Selector)
- if !isSelector {
- return nil, E.New("outbound is not a selector: ", request.GroupTag)
- }
- if !selector.SelectOutbound(request.OutboundTag) {
- return nil, E.New("outbound not found in selector: ", request.OutboundTag)
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) SetGroupExpand(ctx context.Context, request *SetGroupExpandRequest) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- switch s.serviceStatus.Status {
- case ServiceStatus_STARTING, ServiceStatus_STARTED:
- default:
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- boxService := s.instance
- s.serviceAccess.RUnlock()
- if boxService.cacheFile != nil {
- err := boxService.cacheFile.StoreGroupExpand(request.GroupTag, request.IsExpand)
- if err != nil {
- return nil, err
- }
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) GetSystemProxyStatus(ctx context.Context, empty *emptypb.Empty) (*SystemProxyStatus, error) {
- return s.handler.SystemProxyStatus()
- }
- func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *SetSystemProxyEnabledRequest) (*emptypb.Empty, error) {
- err := s.handler.SetSystemProxyEnabled(request.Enabled)
- if err != nil {
- return nil, err
- }
- return nil, err
- }
- func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[Connections]) error {
- err := s.waitForStarted(server.Context())
- if err != nil {
- return err
- }
- s.serviceAccess.RLock()
- boxService := s.instance
- s.serviceAccess.RUnlock()
- ticker := time.NewTicker(time.Duration(request.Interval))
- defer ticker.Stop()
- trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
- var (
- connections = make(map[uuid.UUID]*Connection)
- outConnections []*Connection
- )
- for {
- outConnections = outConnections[:0]
- for _, connection := range trafficManager.Connections() {
- outConnections = append(outConnections, newConnection(connections, connection, false))
- }
- for _, connection := range trafficManager.ClosedConnections() {
- outConnections = append(outConnections, newConnection(connections, connection, true))
- }
- err := server.Send(&Connections{Connections: outConnections})
- if err != nil {
- return err
- }
- select {
- case <-s.ctx.Done():
- return s.ctx.Err()
- case <-server.Context().Done():
- return server.Context().Err()
- case <-ticker.C:
- }
- }
- }
- func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol.TrackerMetadata, isClosed bool) *Connection {
- if oldConnection, loaded := connections[metadata.ID]; loaded {
- if isClosed {
- if oldConnection.ClosedAt == 0 {
- oldConnection.Uplink = 0
- oldConnection.Downlink = 0
- oldConnection.ClosedAt = metadata.ClosedAt.UnixMilli()
- }
- return oldConnection
- }
- lastUplink := oldConnection.UplinkTotal
- lastDownlink := oldConnection.DownlinkTotal
- uplinkTotal := metadata.Upload.Load()
- downlinkTotal := metadata.Download.Load()
- oldConnection.Uplink = uplinkTotal - lastUplink
- oldConnection.Downlink = downlinkTotal - lastDownlink
- oldConnection.UplinkTotal = uplinkTotal
- oldConnection.DownlinkTotal = downlinkTotal
- return oldConnection
- }
- var rule string
- if metadata.Rule != nil {
- rule = metadata.Rule.String()
- }
- uplinkTotal := metadata.Upload.Load()
- downlinkTotal := metadata.Download.Load()
- uplink := uplinkTotal
- downlink := downlinkTotal
- var closedAt int64
- if !metadata.ClosedAt.IsZero() {
- closedAt = metadata.ClosedAt.UnixMilli()
- uplink = 0
- downlink = 0
- }
- connection := &Connection{
- Id: metadata.ID.String(),
- Inbound: metadata.Metadata.Inbound,
- InboundType: metadata.Metadata.InboundType,
- IpVersion: int32(metadata.Metadata.IPVersion),
- Network: metadata.Metadata.Network,
- Source: metadata.Metadata.Source.String(),
- Destination: metadata.Metadata.Destination.String(),
- Domain: metadata.Metadata.Domain,
- Protocol: metadata.Metadata.Protocol,
- User: metadata.Metadata.User,
- FromOutbound: metadata.Metadata.Outbound,
- CreatedAt: metadata.CreatedAt.UnixMilli(),
- ClosedAt: closedAt,
- Uplink: uplink,
- Downlink: downlink,
- UplinkTotal: uplinkTotal,
- DownlinkTotal: downlinkTotal,
- Rule: rule,
- Outbound: metadata.Outbound,
- OutboundType: metadata.OutboundType,
- ChainList: metadata.Chain,
- }
- connections[metadata.ID] = connection
- return connection
- }
- func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {
- s.serviceAccess.RLock()
- switch s.serviceStatus.Status {
- case ServiceStatus_STARTING, ServiceStatus_STARTED:
- default:
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- boxService := s.instance
- s.serviceAccess.RUnlock()
- targetConn := boxService.clashServer.(*clashapi.Server).TrafficManager().Connection(uuid.FromStringOrNil(request.Id))
- if targetConn != nil {
- targetConn.Close()
- }
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) CloseAllConnections(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) {
- conntrack.Close()
- return &emptypb.Empty{}, nil
- }
- func (s *StartedService) GetDeprecatedWarnings(ctx context.Context, empty *emptypb.Empty) (*DeprecatedWarnings, error) {
- s.serviceAccess.RLock()
- if s.serviceStatus.Status != ServiceStatus_STARTED {
- s.serviceAccess.RUnlock()
- return nil, os.ErrInvalid
- }
- boxService := s.instance
- s.serviceAccess.RUnlock()
- notes := service.FromContext[deprecated.Manager](boxService.ctx).(*deprecatedManager).Get()
- return &DeprecatedWarnings{
- Warnings: common.Map(notes, func(it deprecated.Note) *DeprecatedWarning {
- return &DeprecatedWarning{
- Message: it.Message(),
- Impending: it.Impending(),
- MigrationLink: it.MigrationLink,
- }
- }),
- }, nil
- }
- func (s *StartedService) SubscribeHelperEvents(empty *emptypb.Empty, server grpc.ServerStreamingServer[HelperRequest]) error {
- return os.ErrInvalid
- }
- func (s *StartedService) SendHelperResponse(ctx context.Context, response *HelperResponse) (*emptypb.Empty, error) {
- return nil, os.ErrInvalid
- }
- func (s *StartedService) mustEmbedUnimplementedStartedServiceServer() {
- }
- func (s *StartedService) WriteMessage(level log.Level, message string) {
- item := &log.Entry{Level: level, Message: message}
- s.logSubscriber.Emit(item)
- s.logAccess.Lock()
- s.logLines.PushBack(item)
- if s.logLines.Len() > s.logMaxLines {
- s.logLines.Remove(s.logLines.Front())
- }
- s.logAccess.Unlock()
- if s.debug {
- s.handler.WriteDebugMessage(message)
- }
- }
- func (s *StartedService) Instance() *Instance {
- s.serviceAccess.RLock()
- defer s.serviceAccess.RUnlock()
- return s.instance
- }
|