Przeglądaj źródła

Add shadowsocks-multiuser control api

世界 3 lat temu
rodzic
commit
c240f1b359

+ 6 - 12
common/badjson/array.go

@@ -2,19 +2,18 @@ package badjson
 
 import (
 	"bytes"
-	"reflect"
 
 	"github.com/sagernet/sing-box/common/json"
 	E "github.com/sagernet/sing/common/exceptions"
 )
 
-type JSONArray[T any] []T
+type JSONArray []any
 
-func (a JSONArray[T]) MarshalJSON() ([]byte, error) {
-	return json.Marshal([]T(a))
+func (a JSONArray) MarshalJSON() ([]byte, error) {
+	return json.Marshal([]any(a))
 }
 
-func (a *JSONArray[T]) UnmarshalJSON(content []byte) error {
+func (a *JSONArray) UnmarshalJSON(content []byte) error {
 	decoder := json.NewDecoder(bytes.NewReader(content))
 	arrayStart, err := decoder.Token()
 	if err != nil {
@@ -35,17 +34,12 @@ func (a *JSONArray[T]) UnmarshalJSON(content []byte) error {
 	return nil
 }
 
-func (a *JSONArray[T]) decodeJSON(decoder *json.Decoder) error {
+func (a *JSONArray) decodeJSON(decoder *json.Decoder) error {
 	for decoder.More() {
-		value, err := decodeJSON(decoder)
+		item, err := decodeJSON(decoder)
 		if err != nil {
 			return err
 		}
-		item, ok := value.(T)
-		if !ok {
-			var defValue T
-			return E.New("can't cast ", value, " to ", reflect.TypeOf(defValue))
-		}
 		*a = append(*a, item)
 	}
 	return nil

+ 8 - 1
common/badjson/json.go

@@ -1,10 +1,17 @@
 package badjson
 
 import (
+	"bytes"
+
 	"github.com/sagernet/sing-box/common/json"
 	E "github.com/sagernet/sing/common/exceptions"
 )
 
+func Decode(content []byte) (any, error) {
+	decoder := json.NewDecoder(bytes.NewReader(content))
+	return decodeJSON(decoder)
+}
+
 func decodeJSON(decoder *json.Decoder) (any, error) {
 	rawToken, err := decoder.Token()
 	if err != nil {
@@ -27,7 +34,7 @@ func decodeJSON(decoder *json.Decoder) (any, error) {
 			}
 			return &object, nil
 		case '[':
-			var array JSONArray[any]
+			var array JSONArray
 			err = array.decodeJSON(decoder)
 			if err != nil {
 				return nil, err

+ 57 - 0
common/pipelistener/listener.go

@@ -0,0 +1,57 @@
+package pipelistener
+
+import (
+	"io"
+	"net"
+)
+
+var _ net.Listener = (*Listener)(nil)
+
+type Listener struct {
+	pipe chan net.Conn
+	done chan struct{}
+}
+
+func New(channelSize int) *Listener {
+	return &Listener{
+		pipe: make(chan net.Conn, channelSize),
+		done: make(chan struct{}),
+	}
+}
+
+func (l *Listener) Serve(conn net.Conn) {
+	l.pipe <- conn
+}
+
+func (l *Listener) Accept() (net.Conn, error) {
+	select {
+	case conn := <-l.pipe:
+		return conn, nil
+	case <-l.done:
+		return nil, io.ErrClosedPipe
+	}
+}
+
+func (l *Listener) Close() error {
+	select {
+	case <-l.done:
+		return io.ErrClosedPipe
+	default:
+	}
+	close(l.done)
+	return nil
+}
+
+func (l *Listener) Addr() net.Addr {
+	return addr{}
+}
+
+type addr struct{}
+
+func (a addr) Network() string {
+	return "pipe"
+}
+
+func (a addr) String() string {
+	return "pipe"
+}

+ 145 - 0
common/trafficcontrol/manager.go

@@ -0,0 +1,145 @@
+package trafficcontrol
+
+import (
+	"io"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"github.com/sagernet/sing/common/buf"
+	"github.com/sagernet/sing/common/bufio"
+	M "github.com/sagernet/sing/common/metadata"
+	N "github.com/sagernet/sing/common/network"
+)
+
+type Manager[U comparable] struct {
+	access sync.Mutex
+	users  map[U]*Traffic
+}
+
+type Traffic struct {
+	Upload   uint64
+	Download uint64
+}
+
+func NewManager[U comparable]() *Manager[U] {
+	return &Manager[U]{
+		users: make(map[U]*Traffic),
+	}
+}
+
+func (m *Manager[U]) Reset() {
+	m.users = make(map[U]*Traffic)
+}
+
+func (m *Manager[U]) TrackConnection(user U, conn net.Conn) net.Conn {
+	m.access.Lock()
+	defer m.access.Unlock()
+	var traffic *Traffic
+	if t, loaded := m.users[user]; loaded {
+		traffic = t
+	} else {
+		traffic = new(Traffic)
+		m.users[user] = traffic
+	}
+	return &TrackConn{conn, traffic}
+}
+
+func (m *Manager[U]) TrackPacketConnection(user U, conn N.PacketConn) N.PacketConn {
+	m.access.Lock()
+	defer m.access.Unlock()
+	var traffic *Traffic
+	if t, loaded := m.users[user]; loaded {
+		traffic = t
+	} else {
+		traffic = new(Traffic)
+		m.users[user] = traffic
+	}
+	return &TrackPacketConn{conn, traffic}
+}
+
+func (m *Manager[U]) ReadTraffics() map[U]Traffic {
+	m.access.Lock()
+	defer m.access.Unlock()
+
+	trafficMap := make(map[U]Traffic)
+	for user, traffic := range m.users {
+		upload := atomic.SwapUint64(&traffic.Upload, 0)
+		download := atomic.SwapUint64(&traffic.Download, 0)
+		if upload == 0 && download == 0 {
+			continue
+		}
+		trafficMap[user] = Traffic{
+			Upload:   upload,
+			Download: download,
+		}
+	}
+	return trafficMap
+}
+
+type TrackConn struct {
+	net.Conn
+	*Traffic
+}
+
+func (c *TrackConn) Read(p []byte) (n int, err error) {
+	n, err = c.Conn.Read(p)
+	if n > 0 {
+		atomic.AddUint64(&c.Upload, uint64(n))
+	}
+	return
+}
+
+func (c *TrackConn) Write(p []byte) (n int, err error) {
+	n, err = c.Conn.Write(p)
+	if n > 0 {
+		atomic.AddUint64(&c.Download, uint64(n))
+	}
+	return
+}
+
+func (c *TrackConn) WriteTo(w io.Writer) (n int64, err error) {
+	n, err = bufio.Copy(w, c.Conn)
+	if n > 0 {
+		atomic.AddUint64(&c.Upload, uint64(n))
+	}
+	return
+}
+
+func (c *TrackConn) ReadFrom(r io.Reader) (n int64, err error) {
+	n, err = bufio.Copy(c.Conn, r)
+	if n > 0 {
+		atomic.AddUint64(&c.Download, uint64(n))
+	}
+	return
+}
+
+func (c *TrackConn) Upstream() any {
+	return c.Conn
+}
+
+type TrackPacketConn struct {
+	N.PacketConn
+	*Traffic
+}
+
+func (c *TrackPacketConn) ReadPacket(buffer *buf.Buffer) (M.Socksaddr, error) {
+	destination, err := c.PacketConn.ReadPacket(buffer)
+	if err == nil {
+		atomic.AddUint64(&c.Upload, uint64(buffer.Len()))
+	}
+	return destination, err
+}
+
+func (c *TrackPacketConn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
+	n := buffer.Len()
+	err := c.PacketConn.WritePacket(buffer, destination)
+	if err == nil {
+		atomic.AddUint64(&c.Download, uint64(n))
+	}
+	return err
+}
+
+func (c *TrackPacketConn) Upstream() any {
+	return c.PacketConn
+}

+ 94 - 0
inbound/shadowsocks_control.go

@@ -0,0 +1,94 @@
+package inbound
+
+import (
+	"encoding/json"
+	"io"
+	"net/http"
+
+	C "github.com/sagernet/sing-box/constant"
+	"github.com/sagernet/sing-box/option"
+	"github.com/sagernet/sing/common"
+	E "github.com/sagernet/sing/common/exceptions"
+	F "github.com/sagernet/sing/common/format"
+
+	"github.com/go-chi/chi/v5"
+	"github.com/go-chi/render"
+)
+
+func (h *ShadowsocksMulti) createHandler() http.Handler {
+	router := chi.NewRouter()
+	router.Get("/", h.handleHello)
+	router.Put("/users", h.handleUpdateUsers)
+	router.Get("/traffics", h.handleReadTraffics)
+	return router
+}
+
+func (h *ShadowsocksMulti) handleHello(writer http.ResponseWriter, request *http.Request) {
+	render.JSON(writer, request, render.M{
+		"server":  "sing-box",
+		"version": C.Version,
+	})
+}
+
+func (h *ShadowsocksMulti) handleUpdateUsers(writer http.ResponseWriter, request *http.Request) {
+	var users []option.ShadowsocksUser
+	err := readRequest(request, &users)
+	if err != nil {
+		h.newError(E.Cause(err, "controller: update users: parse request"))
+		writer.WriteHeader(http.StatusBadRequest)
+		writer.Write([]byte(F.ToString(err)))
+		return
+	}
+	users = append([]option.ShadowsocksUser{{
+		Name:     "control",
+		Password: h.users[0].Password,
+	}}, users...)
+	err = h.service.UpdateUsersWithPasswords(common.MapIndexed(users, func(index int, user option.ShadowsocksUser) int {
+		return index
+	}), common.Map(users, func(user option.ShadowsocksUser) string {
+		return user.Password
+	}))
+	if err != nil {
+		h.newError(E.Cause(err, "controller: update users"))
+		writer.WriteHeader(http.StatusBadRequest)
+		writer.Write([]byte(F.ToString(err)))
+		return
+	}
+	h.users = users
+	h.trafficManager.Reset()
+	writer.WriteHeader(http.StatusNoContent)
+	h.logger.Info("controller: updated ", len(users)-1, " users")
+}
+
+type ShadowsocksUserTraffic struct {
+	Name     string `json:"name,omitempty"`
+	Upload   uint64 `json:"upload,omitempty"`
+	Download uint64 `json:"download,omitempty"`
+}
+
+func (h *ShadowsocksMulti) handleReadTraffics(writer http.ResponseWriter, request *http.Request) {
+	h.logger.Debug("controller: traffics sent")
+	trafficMap := h.trafficManager.ReadTraffics()
+	if len(trafficMap) == 0 {
+		writer.WriteHeader(http.StatusNoContent)
+		return
+	}
+	traffics := make([]ShadowsocksUserTraffic, 0, len(trafficMap))
+	for user, traffic := range trafficMap {
+		traffics = append(traffics, ShadowsocksUserTraffic{
+			Name:     h.users[user].Name,
+			Upload:   traffic.Upload,
+			Download: traffic.Download,
+		})
+	}
+	render.JSON(writer, request, traffics)
+}
+
+func readRequest(request *http.Request, v any) error {
+	defer request.Body.Close()
+	content, err := io.ReadAll(request.Body)
+	if err != nil {
+		return err
+	}
+	return json.Unmarshal(content, v)
+}

+ 47 - 4
inbound/shadowsocks_multi.go

@@ -3,9 +3,12 @@ package inbound
 import (
 	"context"
 	"net"
+	"net/http"
 	"os"
 
 	"github.com/sagernet/sing-box/adapter"
+	"github.com/sagernet/sing-box/common/pipelistener"
+	"github.com/sagernet/sing-box/common/trafficcontrol"
 	C "github.com/sagernet/sing-box/constant"
 	"github.com/sagernet/sing-box/log"
 	"github.com/sagernet/sing-box/option"
@@ -13,6 +16,7 @@ import (
 	"github.com/sagernet/sing/common"
 	"github.com/sagernet/sing/common/auth"
 	"github.com/sagernet/sing/common/buf"
+	E "github.com/sagernet/sing/common/exceptions"
 	F "github.com/sagernet/sing/common/format"
 	N "github.com/sagernet/sing/common/network"
 )
@@ -21,8 +25,12 @@ var _ adapter.Inbound = (*ShadowsocksMulti)(nil)
 
 type ShadowsocksMulti struct {
 	myInboundAdapter
-	service *shadowaead_2022.MultiService[int]
-	users   []option.ShadowsocksUser
+	service        *shadowaead_2022.MultiService[int]
+	users          []option.ShadowsocksUser
+	controlEnabled bool
+	controller     *http.Server
+	controllerPipe *pipelistener.Listener
+	trafficManager *trafficcontrol.Manager[int]
 }
 
 func newShadowsocksMulti(ctx context.Context, router adapter.Router, logger log.ContextLogger, tag string, options option.ShadowsocksInboundOptions) (*ShadowsocksMulti, error) {
@@ -36,7 +44,6 @@ func newShadowsocksMulti(ctx context.Context, router adapter.Router, logger log.
 			tag:           tag,
 			listenOptions: options.ListenOptions,
 		},
-		users: options.Users,
 	}
 	inbound.connHandler = inbound
 	inbound.packetHandler = inbound
@@ -52,10 +59,20 @@ func newShadowsocksMulti(ctx context.Context, router adapter.Router, logger log.
 		udpTimeout,
 		adapter.NewUpstreamContextHandler(inbound.newConnection, inbound.newPacketConnection, inbound),
 	)
+	users := options.Users
+	if options.ControlPassword != "" {
+		inbound.controlEnabled = true
+		users = append([]option.ShadowsocksUser{{
+			Name:     "control",
+			Password: options.ControlPassword,
+		}}, users...)
+		inbound.controller = &http.Server{Handler: inbound.createHandler()}
+		inbound.trafficManager = trafficcontrol.NewManager[int]()
+	}
 	if err != nil {
 		return nil, err
 	}
-	err = service.UpdateUsersWithPasswords(common.MapIndexed(options.Users, func(index int, user option.ShadowsocksUser) int {
+	err = service.UpdateUsersWithPasswords(common.MapIndexed(users, func(index int, user option.ShadowsocksUser) int {
 		return index
 	}), common.Map(options.Users, func(user option.ShadowsocksUser) string {
 		return user.Password
@@ -65,9 +82,30 @@ func newShadowsocksMulti(ctx context.Context, router adapter.Router, logger log.
 	}
 	inbound.service = service
 	inbound.packetUpstream = service
+	inbound.users = users
 	return inbound, err
 }
 
+func (h *ShadowsocksMulti) Start() error {
+	if h.controlEnabled {
+		h.controllerPipe = pipelistener.New(16)
+		go func() {
+			err := h.controller.Serve(h.controllerPipe)
+			if err != nil {
+				h.newError(E.Cause(err, "controller serve error"))
+			}
+		}()
+	}
+	return h.myInboundAdapter.Start()
+}
+
+func (h *ShadowsocksMulti) Close() error {
+	if h.controlEnabled {
+		h.controllerPipe.Close()
+	}
+	return h.myInboundAdapter.Close()
+}
+
 func (h *ShadowsocksMulti) NewConnection(ctx context.Context, conn net.Conn, metadata adapter.InboundContext) error {
 	return h.service.NewConnection(adapter.WithContext(log.ContextWithNewID(ctx), &metadata), conn, adapter.UpstreamMetadata(metadata))
 }
@@ -81,6 +119,11 @@ func (h *ShadowsocksMulti) newConnection(ctx context.Context, conn net.Conn, met
 	if !loaded {
 		return os.ErrInvalid
 	}
+	if userIndex == 0 && h.controlEnabled {
+		h.logger.InfoContext(ctx, "inbound control connection")
+		h.controllerPipe.Serve(conn)
+		return nil
+	}
 	user := h.users[userIndex].Name
 	if user == "" {
 		user = F.ToString(userIndex)

+ 6 - 5
option/shadowsocks.go

@@ -2,11 +2,12 @@ package option
 
 type ShadowsocksInboundOptions struct {
 	ListenOptions
-	Network      NetworkList              `json:"network,omitempty"`
-	Method       string                   `json:"method"`
-	Password     string                   `json:"password"`
-	Users        []ShadowsocksUser        `json:"users,omitempty"`
-	Destinations []ShadowsocksDestination `json:"destinations,omitempty"`
+	Network         NetworkList              `json:"network,omitempty"`
+	Method          string                   `json:"method"`
+	Password        string                   `json:"password"`
+	ControlPassword string                   `json:"control_password,omitempty"`
+	Users           []ShadowsocksUser        `json:"users,omitempty"`
+	Destinations    []ShadowsocksDestination `json:"destinations,omitempty"`
 }
 
 type ShadowsocksUser struct {