123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- package outbound_test
- import (
- "context"
- "fmt"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/xtls/xray-core/app/policy"
- "github.com/xtls/xray-core/app/proxyman"
- . "github.com/xtls/xray-core/app/proxyman/outbound"
- "github.com/xtls/xray-core/app/stats"
- "github.com/xtls/xray-core/common/net"
- "github.com/xtls/xray-core/common/serial"
- "github.com/xtls/xray-core/common/session"
- core "github.com/xtls/xray-core/core"
- "github.com/xtls/xray-core/features/outbound"
- "github.com/xtls/xray-core/proxy/freedom"
- "github.com/xtls/xray-core/transport/internet/stat"
- )
- func TestInterfaces(t *testing.T) {
- _ = (outbound.Handler)(new(Handler))
- _ = (outbound.Manager)(new(Manager))
- }
- const xrayKey core.XrayKey = 1
- func TestOutboundWithoutStatCounter(t *testing.T) {
- config := &core.Config{
- App: []*serial.TypedMessage{
- serial.ToTypedMessage(&stats.Config{}),
- serial.ToTypedMessage(&policy.Config{
- System: &policy.SystemPolicy{
- Stats: &policy.SystemPolicy_Stats{
- InboundUplink: true,
- },
- },
- }),
- },
- }
- v, _ := core.New(config)
- v.AddFeature((outbound.Manager)(new(Manager)))
- ctx := context.WithValue(context.Background(), xrayKey, v)
- ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{{}})
- h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
- Tag: "tag",
- ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
- })
- conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
- _, ok := conn.(*stat.CounterConnection)
- if ok {
- t.Errorf("Expected conn to not be CounterConnection")
- }
- }
- func TestOutboundWithStatCounter(t *testing.T) {
- config := &core.Config{
- App: []*serial.TypedMessage{
- serial.ToTypedMessage(&stats.Config{}),
- serial.ToTypedMessage(&policy.Config{
- System: &policy.SystemPolicy{
- Stats: &policy.SystemPolicy_Stats{
- OutboundUplink: true,
- OutboundDownlink: true,
- },
- },
- }),
- },
- }
- v, _ := core.New(config)
- v.AddFeature((outbound.Manager)(new(Manager)))
- ctx := context.WithValue(context.Background(), xrayKey, v)
- ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{{}})
- h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
- Tag: "tag",
- ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
- })
- conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
- _, ok := conn.(*stat.CounterConnection)
- if !ok {
- t.Errorf("Expected conn to be CounterConnection")
- }
- }
- func TestTagsCache(t *testing.T) {
- test_duration := 10 * time.Second
- threads_num := 50
- delay := 10 * time.Millisecond
- tags_prefix := "node"
- tags := sync.Map{}
- counter := atomic.Uint64{}
- ohm, err := New(context.Background(), &proxyman.OutboundConfig{})
- if err != nil {
- t.Error("failed to create outbound handler manager")
- }
- config := &core.Config{
- App: []*serial.TypedMessage{},
- }
- v, _ := core.New(config)
- v.AddFeature(ohm)
- ctx := context.WithValue(context.Background(), xrayKey, v)
- stop_add_rm := false
- wg_add_rm := sync.WaitGroup{}
- addHandlers := func() {
- defer wg_add_rm.Done()
- for !stop_add_rm {
- time.Sleep(delay)
- idx := counter.Add(1)
- tag := fmt.Sprintf("%s%d", tags_prefix, idx)
- cfg := &core.OutboundHandlerConfig{
- Tag: tag,
- ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
- }
- if h, err := NewHandler(ctx, cfg); err == nil {
- if err := ohm.AddHandler(ctx, h); err == nil {
- // t.Log("add handler:", tag)
- tags.Store(tag, nil)
- } else {
- t.Error("failed to add handler:", tag)
- }
- } else {
- t.Error("failed to create handler:", tag)
- }
- }
- }
- rmHandlers := func() {
- defer wg_add_rm.Done()
- for !stop_add_rm {
- time.Sleep(delay)
- tags.Range(func(key interface{}, value interface{}) bool {
- if _, ok := tags.LoadAndDelete(key); ok {
- // t.Log("remove handler:", key)
- ohm.RemoveHandler(ctx, key.(string))
- return false
- }
- return true
- })
- }
- }
- selectors := []string{tags_prefix}
- wg_get := sync.WaitGroup{}
- stop_get := false
- getTags := func() {
- defer wg_get.Done()
- for !stop_get {
- time.Sleep(delay)
- _ = ohm.Select(selectors)
- // t.Logf("get tags: %v", tag)
- }
- }
- for i := 0; i < threads_num; i++ {
- wg_add_rm.Add(2)
- go rmHandlers()
- go addHandlers()
- wg_get.Add(1)
- go getTags()
- }
- time.Sleep(test_duration)
- stop_add_rm = true
- wg_add_rm.Wait()
- stop_get = true
- wg_get.Wait()
- }
|