Explorar el Código

feat: metrics including pprof, expvars

yichya hace 3 años
padre
commit
35eb165f63

+ 148 - 0
app/metrics/config.pb.go

@@ -0,0 +1,148 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// 	protoc-gen-go v1.27.1
+// 	protoc        v3.18.0
+// source: config.proto
+
+package metrics
+
+import (
+	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+	reflect "reflect"
+	sync "sync"
+)
+
+const (
+	// Verify that this generated code is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+	// Verify that runtime/protoimpl is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// Config is the settings for metrics.
+type Config struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	// Tag of the outbound handler that handles metrics http connections.
+	Tag string `protobuf:"bytes,1,opt,name=tag,proto3" json:"tag,omitempty"`
+}
+
+func (x *Config) Reset() {
+	*x = Config{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_config_proto_msgTypes[0]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *Config) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Config) ProtoMessage() {}
+
+func (x *Config) ProtoReflect() protoreflect.Message {
+	mi := &file_config_proto_msgTypes[0]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use Config.ProtoReflect.Descriptor instead.
+func (*Config) Descriptor() ([]byte, []int) {
+	return file_config_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *Config) GetTag() string {
+	if x != nil {
+		return x.Tag
+	}
+	return ""
+}
+
+var File_config_proto protoreflect.FileDescriptor
+
+var file_config_proto_rawDesc = []byte{
+	0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10,
+	0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73,
+	0x22, 0x1a, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x61,
+	0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x74, 0x61, 0x67, 0x42, 0x52, 0x0a, 0x14,
+	0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6d, 0x65, 0x74,
+	0x72, 0x69, 0x63, 0x73, 0x50, 0x01, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
+	0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72,
+	0x65, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0xaa, 0x02, 0x10,
+	0x58, 0x72, 0x61, 0x79, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73,
+	0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+	file_config_proto_rawDescOnce sync.Once
+	file_config_proto_rawDescData = file_config_proto_rawDesc
+)
+
+func file_config_proto_rawDescGZIP() []byte {
+	file_config_proto_rawDescOnce.Do(func() {
+		file_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_config_proto_rawDescData)
+	})
+	return file_config_proto_rawDescData
+}
+
+var file_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
+var file_config_proto_goTypes = []interface{}{
+	(*Config)(nil), // 0: xray.app.metrics.Config
+}
+var file_config_proto_depIdxs = []int32{
+	0, // [0:0] is the sub-list for method output_type
+	0, // [0:0] is the sub-list for method input_type
+	0, // [0:0] is the sub-list for extension type_name
+	0, // [0:0] is the sub-list for extension extendee
+	0, // [0:0] is the sub-list for field type_name
+}
+
+func init() { file_config_proto_init() }
+func file_config_proto_init() {
+	if File_config_proto != nil {
+		return
+	}
+	if !protoimpl.UnsafeEnabled {
+		file_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*Config); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+	}
+	type x struct{}
+	out := protoimpl.TypeBuilder{
+		File: protoimpl.DescBuilder{
+			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+			RawDescriptor: file_config_proto_rawDesc,
+			NumEnums:      0,
+			NumMessages:   1,
+			NumExtensions: 0,
+			NumServices:   0,
+		},
+		GoTypes:           file_config_proto_goTypes,
+		DependencyIndexes: file_config_proto_depIdxs,
+		MessageInfos:      file_config_proto_msgTypes,
+	}.Build()
+	File_config_proto = out.File
+	file_config_proto_rawDesc = nil
+	file_config_proto_goTypes = nil
+	file_config_proto_depIdxs = nil
+}

+ 13 - 0
app/metrics/config.proto

@@ -0,0 +1,13 @@
+syntax = "proto3";
+
+package xray.app.metrics;
+option csharp_namespace = "Xray.App.Metrics";
+option go_package = "github.com/xtls/xray-core/app/metrics";
+option java_package = "com.xray.app.metrics";
+option java_multiple_files = true;
+
+// Config is the settings for metrics.
+message Config {
+  // Tag of the outbound handler that handles metrics http connections.
+  string tag = 1;
+}

+ 9 - 0
app/metrics/errors.generated.go

@@ -0,0 +1,9 @@
+package metrics
+
+import "github.com/xtls/xray-core/common/errors"
+
+type errPathObjHolder struct{}
+
+func newError(values ...interface{}) *errors.Error {
+	return errors.New(values...).WithPathObj(errPathObjHolder{})
+}

+ 115 - 0
app/metrics/metrics.go

@@ -0,0 +1,115 @@
+package metrics
+
+import (
+	"context"
+	"expvar"
+	"net/http"
+	_ "net/http/pprof"
+	"strings"
+
+	"github.com/xtls/xray-core/app/observatory"
+	"github.com/xtls/xray-core/app/stats"
+	"github.com/xtls/xray-core/common"
+	"github.com/xtls/xray-core/common/net"
+	"github.com/xtls/xray-core/common/signal/done"
+	"github.com/xtls/xray-core/core"
+	"github.com/xtls/xray-core/features/extension"
+	"github.com/xtls/xray-core/features/outbound"
+	feature_stats "github.com/xtls/xray-core/features/stats"
+)
+
+type MetricsHandler struct {
+	ohm          outbound.Manager
+	statsManager feature_stats.Manager
+	observatory  extension.Observatory
+	tag          string
+}
+
+// NewMetricsHandler creates a new MetricsHandler based on the given config.
+func NewMetricsHandler(ctx context.Context, config *Config) (*MetricsHandler, error) {
+	c := &MetricsHandler{
+		tag: config.Tag,
+	}
+	common.Must(core.RequireFeatures(ctx, func(om outbound.Manager, sm feature_stats.Manager) {
+		c.statsManager = sm
+		c.ohm = om
+	}))
+	expvar.Publish("stats", expvar.Func(func() interface{} {
+		manager, ok := c.statsManager.(*stats.Manager)
+		if !ok {
+			return nil
+		}
+		var resp = map[string]map[string]map[string]int64{
+			"inbound":  {},
+			"outbound": {},
+			"user":     {},
+		}
+		manager.VisitCounters(func(name string, counter feature_stats.Counter) bool {
+			nameSplit := strings.Split(name, ">>>")
+			typeName, tagOrUser, direction := nameSplit[0], nameSplit[1], nameSplit[3]
+			if item, found := resp[typeName][tagOrUser]; found {
+				item[direction] = counter.Value()
+			} else {
+				resp[typeName][tagOrUser] = map[string]int64{
+					direction: counter.Value(),
+				}
+			}
+			return true
+		})
+		return resp
+	}))
+	expvar.Publish("observatory", expvar.Func(func() interface{} {
+		if c.observatory == nil {
+			common.Must(core.RequireFeatures(ctx, func(observatory extension.Observatory) error {
+				c.observatory = observatory
+				return nil
+			}))
+		}
+		var resp = map[string]*observatory.OutboundStatus{}
+		if o, err := c.observatory.GetObservation(context.Background()); err != nil {
+			return err
+		} else {
+			for _, x := range o.(*observatory.ObservationResult).GetStatus() {
+				resp[x.OutboundTag] = x
+			}
+		}
+		return resp
+	}))
+	return c, nil
+}
+
+func (p *MetricsHandler) Type() interface{} {
+	return (*MetricsHandler)(nil)
+}
+
+func (p *MetricsHandler) Start() error {
+	listener := &OutboundListener{
+		buffer: make(chan net.Conn, 4),
+		done:   done.New(),
+	}
+
+	go func() {
+		if err := http.Serve(listener, http.DefaultServeMux); err != nil {
+			newError("failed to start metrics server").Base(err).AtError().WriteToLog()
+		}
+	}()
+
+	if err := p.ohm.RemoveHandler(context.Background(), p.tag); err != nil {
+		newError("failed to remove existing handler").WriteToLog()
+	}
+
+	return p.ohm.AddHandler(context.Background(), &Outbound{
+		tag:      p.tag,
+		listener: listener,
+	})
+}
+
+func (p *MetricsHandler) Close() error {
+	return nil
+}
+
+func init() {
+	common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) {
+		return NewMetricsHandler(ctx, cfg.(*Config))
+	}))
+}

+ 109 - 0
app/metrics/outbound.go

@@ -0,0 +1,109 @@
+package metrics
+
+import (
+	"context"
+	"sync"
+
+	"github.com/xtls/xray-core/common"
+	"github.com/xtls/xray-core/common/net"
+	"github.com/xtls/xray-core/common/net/cnc"
+	"github.com/xtls/xray-core/common/signal/done"
+	"github.com/xtls/xray-core/transport"
+)
+
+// OutboundListener is a net.Listener for listening pprof http connections.
+type OutboundListener struct {
+	buffer chan net.Conn
+	done   *done.Instance
+}
+
+func (l *OutboundListener) add(conn net.Conn) {
+	select {
+	case l.buffer <- conn:
+	case <-l.done.Wait():
+		conn.Close()
+	default:
+		conn.Close()
+	}
+}
+
+// Accept implements net.Listener.
+func (l *OutboundListener) Accept() (net.Conn, error) {
+	select {
+	case <-l.done.Wait():
+		return nil, newError("listen closed")
+	case c := <-l.buffer:
+		return c, nil
+	}
+}
+
+// Close implement net.Listener.
+func (l *OutboundListener) Close() error {
+	common.Must(l.done.Close())
+L:
+	for {
+		select {
+		case c := <-l.buffer:
+			c.Close()
+		default:
+			break L
+		}
+	}
+	return nil
+}
+
+// Addr implements net.Listener.
+func (l *OutboundListener) Addr() net.Addr {
+	return &net.TCPAddr{
+		IP:   net.IP{0, 0, 0, 0},
+		Port: 0,
+	}
+}
+
+// Outbound is a outbound.Handler that handles pprof http connections.
+type Outbound struct {
+	tag      string
+	listener *OutboundListener
+	access   sync.RWMutex
+	closed   bool
+}
+
+// Dispatch implements outbound.Handler.
+func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
+	co.access.RLock()
+
+	if co.closed {
+		common.Interrupt(link.Reader)
+		common.Interrupt(link.Writer)
+		co.access.RUnlock()
+		return
+	}
+
+	closeSignal := done.New()
+	c := cnc.NewConnection(cnc.ConnectionInputMulti(link.Writer), cnc.ConnectionOutputMulti(link.Reader), cnc.ConnectionOnClose(closeSignal))
+	co.listener.add(c)
+	co.access.RUnlock()
+	<-closeSignal.Wait()
+}
+
+// Tag implements outbound.Handler.
+func (co *Outbound) Tag() string {
+	return co.tag
+}
+
+// Start implements common.Runnable.
+func (co *Outbound) Start() error {
+	co.access.Lock()
+	co.closed = false
+	co.access.Unlock()
+	return nil
+}
+
+// Close implements common.Closable.
+func (co *Outbound) Close() error {
+	co.access.Lock()
+	defer co.access.Unlock()
+
+	co.closed = true
+	return co.listener.Close()
+}

+ 19 - 0
infra/conf/metrics.go

@@ -0,0 +1,19 @@
+package conf
+
+import (
+	"github.com/xtls/xray-core/app/metrics"
+)
+
+type MetricsConfig struct {
+	Tag string `json:"tag"`
+}
+
+func (c *MetricsConfig) Build() (*metrics.Config, error) {
+	if c.Tag == "" {
+		return nil, newError("metrics tag can't be empty.")
+	}
+
+	return &metrics.Config{
+		Tag: c.Tag,
+	}, nil
+}

+ 11 - 1
infra/conf/xray.go

@@ -411,6 +411,7 @@ type Config struct {
 	Transport       *TransportConfig       `json:"transport"`
 	Policy          *PolicyConfig          `json:"policy"`
 	API             *APIConfig             `json:"api"`
+	Metrics         *MetricsConfig         `json:"metrics"`
 	Stats           *StatsConfig           `json:"stats"`
 	Reverse         *ReverseConfig         `json:"reverse"`
 	FakeDNS         *FakeDNSConfig         `json:"fakeDns"`
@@ -461,6 +462,9 @@ func (c *Config) Override(o *Config, fn string) {
 	if o.API != nil {
 		c.API = o.API
 	}
+	if o.Metrics != nil {
+		c.Metrics = o.Metrics
+	}
 	if o.Stats != nil {
 		c.Stats = o.Stats
 	}
@@ -566,7 +570,13 @@ func (c *Config) Build() (*core.Config, error) {
 		}
 		config.App = append(config.App, serial.ToTypedMessage(apiConf))
 	}
-
+	if c.Metrics != nil {
+		metricsConf, err := c.Metrics.Build()
+		if err != nil {
+			return nil, err
+		}
+		config.App = append(config.App, serial.ToTypedMessage(metricsConf))
+	}
 	if c.Stats != nil {
 		statsConf, err := c.Stats.Build()
 		if err != nil {

+ 1 - 0
main/distro/all/all.go

@@ -21,6 +21,7 @@ import (
 	_ "github.com/xtls/xray-core/app/dns"
 	_ "github.com/xtls/xray-core/app/dns/fakedns"
 	_ "github.com/xtls/xray-core/app/log"
+	_ "github.com/xtls/xray-core/app/metrics"
 	_ "github.com/xtls/xray-core/app/policy"
 	_ "github.com/xtls/xray-core/app/reverse"
 	_ "github.com/xtls/xray-core/app/router"

+ 105 - 0
testing/scenarios/metrics_test.go

@@ -0,0 +1,105 @@
+package scenarios
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"testing"
+
+	"github.com/xtls/xray-core/app/metrics"
+	"github.com/xtls/xray-core/app/proxyman"
+	"github.com/xtls/xray-core/app/router"
+	"github.com/xtls/xray-core/common"
+	"github.com/xtls/xray-core/common/net"
+	"github.com/xtls/xray-core/common/serial"
+	"github.com/xtls/xray-core/core"
+	"github.com/xtls/xray-core/proxy/dokodemo"
+	"github.com/xtls/xray-core/proxy/freedom"
+	"github.com/xtls/xray-core/testing/servers/tcp"
+)
+
+const expectedMessage = "goroutine profile: total"
+
+func TestMetrics(t *testing.T) {
+	tcpServer := tcp.Server{
+		MsgProcessor: xor,
+	}
+	dest, err := tcpServer.Start()
+	common.Must(err)
+	defer tcpServer.Close()
+
+	metricsPort := tcp.PickPort()
+	clientConfig := &core.Config{
+		App: []*serial.TypedMessage{
+			serial.ToTypedMessage(&metrics.Config{
+				Tag: "metrics_out",
+			}),
+			serial.ToTypedMessage(&router.Config{
+				Rule: []*router.RoutingRule{
+					{
+						InboundTag: []string{"metrics_in"},
+						TargetTag: &router.RoutingRule_Tag{
+							Tag: "metrics_out",
+						},
+					},
+				},
+			}),
+		},
+		Inbound: []*core.InboundHandlerConfig{
+			{
+				Tag: "metrics_in",
+				ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
+					PortList: &net.PortList{Range: []*net.PortRange{net.SinglePortRange(metricsPort)}},
+					Listen:   net.NewIPOrDomain(net.LocalHostIP),
+				}),
+				ProxySettings: serial.ToTypedMessage(&dokodemo.Config{
+					Address:  net.NewIPOrDomain(dest.Address),
+					Port:     uint32(dest.Port),
+					Networks: []net.Network{net.Network_TCP},
+				}),
+			},
+		},
+		Outbound: []*core.OutboundHandlerConfig{
+			{
+				Tag:           "default-outbound",
+				ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
+			},
+		},
+	}
+
+	servers, err := InitializeServerConfigs(clientConfig)
+	common.Must(err)
+	defer CloseAllServers(servers)
+
+	resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/debug/pprof/goroutine?debug=1", metricsPort))
+	common.Must(err)
+	if resp == nil {
+		t.Error("unexpected pprof nil response")
+	}
+	if resp.StatusCode != http.StatusOK {
+		t.Error("unexpected pprof status code")
+	}
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if string(body)[0:len(expectedMessage)] != expectedMessage {
+		t.Error("unexpected response body from pprof handler")
+	}
+
+	resp2, err2 := http.Get(fmt.Sprintf("http://127.0.0.1:%d/debug/vars", metricsPort))
+	common.Must(err2)
+	if resp2 == nil {
+		t.Error("unexpected expvars nil response")
+	}
+	if resp2.StatusCode != http.StatusOK {
+		t.Error("unexpected expvars status code")
+	}
+	body2, err2 := ioutil.ReadAll(resp2.Body)
+	if err2 != nil {
+		t.Fatal(err2)
+	}
+	if string(body2)[0] != '{' {
+		t.Error("unexpected response body from expvars handler")
+	}
+}