123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- package command_test
- import (
- "context"
- "testing"
- "time"
- "github.com/golang/mock/gomock"
- "github.com/google/go-cmp/cmp"
- "github.com/google/go-cmp/cmp/cmpopts"
- "github.com/xtls/xray-core/v1/app/router"
- . "github.com/xtls/xray-core/v1/app/router/command"
- "github.com/xtls/xray-core/v1/app/stats"
- "github.com/xtls/xray-core/v1/common"
- "github.com/xtls/xray-core/v1/common/net"
- "github.com/xtls/xray-core/v1/features/routing"
- "github.com/xtls/xray-core/v1/testing/mocks"
- "google.golang.org/grpc"
- "google.golang.org/grpc/test/bufconn"
- )
- func TestServiceSubscribeRoutingStats(t *testing.T) {
- c := stats.NewChannel(&stats.ChannelConfig{
- SubscriberLimit: 1,
- BufferSize: 0,
- Blocking: true,
- })
- common.Must(c.Start())
- defer c.Close()
- lis := bufconn.Listen(1024 * 1024)
- bufDialer := func(context.Context, string) (net.Conn, error) {
- return lis.Dial()
- }
- testCases := []*RoutingContext{
- {InboundTag: "in", OutboundTag: "out"},
- {TargetIPs: [][]byte{{1, 2, 3, 4}}, TargetPort: 8080, OutboundTag: "out"},
- {TargetDomain: "example.com", TargetPort: 443, OutboundTag: "out"},
- {SourcePort: 9999, TargetPort: 9999, OutboundTag: "out"},
- {Network: net.Network_UDP, OutboundGroupTags: []string{"outergroup", "innergroup"}, OutboundTag: "out"},
- {Protocol: "bittorrent", OutboundTag: "blocked"},
- {User: "[email protected]", OutboundTag: "out"},
- {SourceIPs: [][]byte{{127, 0, 0, 1}}, Attributes: map[string]string{"attr": "value"}, OutboundTag: "out"},
- }
- errCh := make(chan error)
- nextPub := make(chan struct{})
- // Server goroutine
- go func() {
- server := grpc.NewServer()
- RegisterRoutingServiceServer(server, NewRoutingServer(nil, c))
- errCh <- server.Serve(lis)
- }()
- // Publisher goroutine
- go func() {
- publishTestCases := func() error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
- for { // Wait until there's one subscriber in routing stats channel
- if len(c.Subscribers()) > 0 {
- break
- }
- if ctx.Err() != nil {
- return ctx.Err()
- }
- }
- for _, tc := range testCases {
- c.Publish(context.Background(), AsRoutingRoute(tc))
- time.Sleep(time.Millisecond)
- }
- return nil
- }
- if err := publishTestCases(); err != nil {
- errCh <- err
- }
- // Wait for next round of publishing
- <-nextPub
- if err := publishTestCases(); err != nil {
- errCh <- err
- }
- }()
- // Client goroutine
- go func() {
- defer lis.Close()
- conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
- if err != nil {
- errCh <- err
- return
- }
- defer conn.Close()
- client := NewRoutingServiceClient(conn)
- // Test retrieving all fields
- testRetrievingAllFields := func() error {
- streamCtx, streamClose := context.WithCancel(context.Background())
- // Test the unsubscription of stream works well
- defer func() {
- streamClose()
- timeOutCtx, timeout := context.WithTimeout(context.Background(), time.Second)
- defer timeout()
- for { // Wait until there's no subscriber in routing stats channel
- if len(c.Subscribers()) == 0 {
- break
- }
- if timeOutCtx.Err() != nil {
- t.Error("unexpected subscribers not decreased in channel", timeOutCtx.Err())
- }
- }
- }()
- stream, err := client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{})
- if err != nil {
- return err
- }
- for _, tc := range testCases {
- msg, err := stream.Recv()
- if err != nil {
- return err
- }
- if r := cmp.Diff(msg, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
- t.Error(r)
- }
- }
- // Test that double subscription will fail
- errStream, err := client.SubscribeRoutingStats(context.Background(), &SubscribeRoutingStatsRequest{
- FieldSelectors: []string{"ip", "port", "domain", "outbound"},
- })
- if err != nil {
- return err
- }
- if _, err := errStream.Recv(); err == nil {
- t.Error("unexpected successful subscription")
- }
- return nil
- }
- // Test retrieving only a subset of fields
- testRetrievingSubsetOfFields := func() error {
- streamCtx, streamClose := context.WithCancel(context.Background())
- defer streamClose()
- stream, err := client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{
- FieldSelectors: []string{"ip", "port", "domain", "outbound"},
- })
- if err != nil {
- return err
- }
- // Send nextPub signal to start next round of publishing
- close(nextPub)
- for _, tc := range testCases {
- msg, err := stream.Recv()
- if err != nil {
- return err
- }
- stat := &RoutingContext{ // Only a subset of stats is retrieved
- SourceIPs: tc.SourceIPs,
- TargetIPs: tc.TargetIPs,
- SourcePort: tc.SourcePort,
- TargetPort: tc.TargetPort,
- TargetDomain: tc.TargetDomain,
- OutboundGroupTags: tc.OutboundGroupTags,
- OutboundTag: tc.OutboundTag,
- }
- if r := cmp.Diff(msg, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
- t.Error(r)
- }
- }
- return nil
- }
- if err := testRetrievingAllFields(); err != nil {
- errCh <- err
- }
- if err := testRetrievingSubsetOfFields(); err != nil {
- errCh <- err
- }
- errCh <- nil // Client passed all tests successfully
- }()
- // Wait for goroutines to complete
- select {
- case <-time.After(2 * time.Second):
- t.Fatal("Test timeout after 2s")
- case err := <-errCh:
- if err != nil {
- t.Fatal(err)
- }
- }
- }
- func TestSerivceTestRoute(t *testing.T) {
- c := stats.NewChannel(&stats.ChannelConfig{
- SubscriberLimit: 1,
- BufferSize: 16,
- Blocking: true,
- })
- common.Must(c.Start())
- defer c.Close()
- r := new(router.Router)
- mockCtl := gomock.NewController(t)
- defer mockCtl.Finish()
- common.Must(r.Init(&router.Config{
- Rule: []*router.RoutingRule{
- {
- InboundTag: []string{"in"},
- TargetTag: &router.RoutingRule_Tag{Tag: "out"},
- },
- {
- Protocol: []string{"bittorrent"},
- TargetTag: &router.RoutingRule_Tag{Tag: "blocked"},
- },
- {
- PortList: &net.PortList{Range: []*net.PortRange{{From: 8080, To: 8080}}},
- TargetTag: &router.RoutingRule_Tag{Tag: "out"},
- },
- {
- SourcePortList: &net.PortList{Range: []*net.PortRange{{From: 9999, To: 9999}}},
- TargetTag: &router.RoutingRule_Tag{Tag: "out"},
- },
- {
- Domain: []*router.Domain{{Type: router.Domain_Domain, Value: "com"}},
- TargetTag: &router.RoutingRule_Tag{Tag: "out"},
- },
- {
- SourceGeoip: []*router.GeoIP{{CountryCode: "private", Cidr: []*router.CIDR{{Ip: []byte{127, 0, 0, 0}, Prefix: 8}}}},
- TargetTag: &router.RoutingRule_Tag{Tag: "out"},
- },
- {
- UserEmail: []string{"[email protected]"},
- TargetTag: &router.RoutingRule_Tag{Tag: "out"},
- },
- {
- Networks: []net.Network{net.Network_UDP, net.Network_TCP},
- TargetTag: &router.RoutingRule_Tag{Tag: "out"},
- },
- },
- }, mocks.NewDNSClient(mockCtl), mocks.NewOutboundManager(mockCtl)))
- lis := bufconn.Listen(1024 * 1024)
- bufDialer := func(context.Context, string) (net.Conn, error) {
- return lis.Dial()
- }
- errCh := make(chan error)
- // Server goroutine
- go func() {
- server := grpc.NewServer()
- RegisterRoutingServiceServer(server, NewRoutingServer(r, c))
- errCh <- server.Serve(lis)
- }()
- // Client goroutine
- go func() {
- defer lis.Close()
- conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
- if err != nil {
- errCh <- err
- }
- defer conn.Close()
- client := NewRoutingServiceClient(conn)
- testCases := []*RoutingContext{
- {InboundTag: "in", OutboundTag: "out"},
- {TargetIPs: [][]byte{{1, 2, 3, 4}}, TargetPort: 8080, OutboundTag: "out"},
- {TargetDomain: "example.com", TargetPort: 443, OutboundTag: "out"},
- {SourcePort: 9999, TargetPort: 9999, OutboundTag: "out"},
- {Network: net.Network_UDP, Protocol: "bittorrent", OutboundTag: "blocked"},
- {User: "[email protected]", OutboundTag: "out"},
- {SourceIPs: [][]byte{{127, 0, 0, 1}}, Attributes: map[string]string{"attr": "value"}, OutboundTag: "out"},
- }
- // Test simple TestRoute
- testSimple := func() error {
- for _, tc := range testCases {
- route, err := client.TestRoute(context.Background(), &TestRouteRequest{RoutingContext: tc})
- if err != nil {
- return err
- }
- if r := cmp.Diff(route, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
- t.Error(r)
- }
- }
- return nil
- }
- // Test TestRoute with special options
- testOptions := func() error {
- sub, err := c.Subscribe()
- if err != nil {
- return err
- }
- for _, tc := range testCases {
- route, err := client.TestRoute(context.Background(), &TestRouteRequest{
- RoutingContext: tc,
- FieldSelectors: []string{"ip", "port", "domain", "outbound"},
- PublishResult: true,
- })
- if err != nil {
- return err
- }
- stat := &RoutingContext{ // Only a subset of stats is retrieved
- SourceIPs: tc.SourceIPs,
- TargetIPs: tc.TargetIPs,
- SourcePort: tc.SourcePort,
- TargetPort: tc.TargetPort,
- TargetDomain: tc.TargetDomain,
- OutboundGroupTags: tc.OutboundGroupTags,
- OutboundTag: tc.OutboundTag,
- }
- if r := cmp.Diff(route, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
- t.Error(r)
- }
- select { // Check that routing result has been published to statistics channel
- case msg, received := <-sub:
- if route, ok := msg.(routing.Route); received && ok {
- if r := cmp.Diff(AsProtobufMessage(nil)(route), tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" {
- t.Error(r)
- }
- } else {
- t.Error("unexpected failure in receiving published routing result for testcase", tc)
- }
- case <-time.After(100 * time.Millisecond):
- t.Error("unexpected failure in receiving published routing result", tc)
- }
- }
- return nil
- }
- if err := testSimple(); err != nil {
- errCh <- err
- }
- if err := testOptions(); err != nil {
- errCh <- err
- }
- errCh <- nil // Client passed all tests successfully
- }()
- // Wait for goroutines to complete
- select {
- case <-time.After(2 * time.Second):
- t.Fatal("Test timeout after 2s")
- case err := <-errCh:
- if err != nil {
- t.Fatal(err)
- }
- }
- }
|