worker.go 11 KB


  1. package inbound
  2. import (
  3. "context"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/xtls/xray-core/v1/app/proxyman"
  8. "github.com/xtls/xray-core/v1/common"
  9. "github.com/xtls/xray-core/v1/common/buf"
  10. "github.com/xtls/xray-core/v1/common/net"
  11. "github.com/xtls/xray-core/v1/common/serial"
  12. "github.com/xtls/xray-core/v1/common/session"
  13. "github.com/xtls/xray-core/v1/common/signal/done"
  14. "github.com/xtls/xray-core/v1/common/task"
  15. "github.com/xtls/xray-core/v1/features/routing"
  16. "github.com/xtls/xray-core/v1/features/stats"
  17. "github.com/xtls/xray-core/v1/proxy"
  18. "github.com/xtls/xray-core/v1/transport/internet"
  19. "github.com/xtls/xray-core/v1/transport/internet/tcp"
  20. "github.com/xtls/xray-core/v1/transport/internet/udp"
  21. "github.com/xtls/xray-core/v1/transport/pipe"
  22. )
  23. type worker interface {
  24. Start() error
  25. Close() error
  26. Port() net.Port
  27. Proxy() proxy.Inbound
  28. }
  29. type tcpWorker struct {
  30. address net.Address
  31. port net.Port
  32. proxy proxy.Inbound
  33. stream *internet.MemoryStreamConfig
  34. recvOrigDest bool
  35. tag string
  36. dispatcher routing.Dispatcher
  37. sniffingConfig *proxyman.SniffingConfig
  38. uplinkCounter stats.Counter
  39. downlinkCounter stats.Counter
  40. hub internet.Listener
  41. ctx context.Context
  42. }
  43. func getTProxyType(s *internet.MemoryStreamConfig) internet.SocketConfig_TProxyMode {
  44. if s == nil || s.SocketSettings == nil {
  45. return internet.SocketConfig_Off
  46. }
  47. return s.SocketSettings.Tproxy
  48. }
  49. func (w *tcpWorker) callback(conn internet.Connection) {
  50. ctx, cancel := context.WithCancel(w.ctx)
  51. sid := session.NewID()
  52. ctx = session.ContextWithID(ctx, sid)
  53. if w.recvOrigDest {
  54. var dest net.Destination
  55. switch getTProxyType(w.stream) {
  56. case internet.SocketConfig_Redirect:
  57. d, err := tcp.GetOriginalDestination(conn)
  58. if err != nil {
  59. newError("failed to get original destination").Base(err).WriteToLog(session.ExportIDToError(ctx))
  60. } else {
  61. dest = d
  62. }
  63. case internet.SocketConfig_TProxy:
  64. dest = net.DestinationFromAddr(conn.LocalAddr())
  65. }
  66. if dest.IsValid() {
  67. ctx = session.ContextWithOutbound(ctx, &session.Outbound{
  68. Target: dest,
  69. })
  70. }
  71. }
  72. ctx = session.ContextWithInbound(ctx, &session.Inbound{
  73. Source: net.DestinationFromAddr(conn.RemoteAddr()),
  74. Gateway: net.TCPDestination(w.address, w.port),
  75. Tag: w.tag,
  76. })
  77. content := new(session.Content)
  78. if w.sniffingConfig != nil {
  79. content.SniffingRequest.Enabled = w.sniffingConfig.Enabled
  80. content.SniffingRequest.OverrideDestinationForProtocol = w.sniffingConfig.DestinationOverride
  81. }
  82. ctx = session.ContextWithContent(ctx, content)
  83. if w.uplinkCounter != nil || w.downlinkCounter != nil {
  84. conn = &internet.StatCouterConnection{
  85. Connection: conn,
  86. ReadCounter: w.uplinkCounter,
  87. WriteCounter: w.downlinkCounter,
  88. }
  89. }
  90. if err := w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher); err != nil {
  91. newError("connection ends").Base(err).WriteToLog(session.ExportIDToError(ctx))
  92. }
  93. cancel()
  94. if err := conn.Close(); err != nil {
  95. newError("failed to close connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
  96. }
  97. }
  98. func (w *tcpWorker) Proxy() proxy.Inbound {
  99. return w.proxy
  100. }
  101. func (w *tcpWorker) Start() error {
  102. ctx := context.Background()
  103. hub, err := internet.ListenTCP(ctx, w.address, w.port, w.stream, func(conn internet.Connection) {
  104. go w.callback(conn)
  105. })
  106. if err != nil {
  107. return newError("failed to listen TCP on ", w.port).AtWarning().Base(err)
  108. }
  109. w.hub = hub
  110. return nil
  111. }
  112. func (w *tcpWorker) Close() error {
  113. var errors []interface{}
  114. if w.hub != nil {
  115. if err := common.Close(w.hub); err != nil {
  116. errors = append(errors, err)
  117. }
  118. if err := common.Close(w.proxy); err != nil {
  119. errors = append(errors, err)
  120. }
  121. }
  122. if len(errors) > 0 {
  123. return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
  124. }
  125. return nil
  126. }
  127. func (w *tcpWorker) Port() net.Port {
  128. return w.port
  129. }
  130. type udpConn struct {
  131. lastActivityTime int64 // in seconds
  132. reader buf.Reader
  133. writer buf.Writer
  134. output func([]byte) (int, error)
  135. remote net.Addr
  136. local net.Addr
  137. done *done.Instance
  138. uplink stats.Counter
  139. downlink stats.Counter
  140. }
  141. func (c *udpConn) updateActivity() {
  142. atomic.StoreInt64(&c.lastActivityTime, time.Now().Unix())
  143. }
  144. // ReadMultiBuffer implements buf.Reader
  145. func (c *udpConn) ReadMultiBuffer() (buf.MultiBuffer, error) {
  146. mb, err := c.reader.ReadMultiBuffer()
  147. if err != nil {
  148. return nil, err
  149. }
  150. c.updateActivity()
  151. if c.uplink != nil {
  152. c.uplink.Add(int64(mb.Len()))
  153. }
  154. return mb, nil
  155. }
  156. func (c *udpConn) Read(buf []byte) (int, error) {
  157. panic("not implemented")
  158. }
  159. // Write implements io.Writer.
  160. func (c *udpConn) Write(buf []byte) (int, error) {
  161. n, err := c.output(buf)
  162. if c.downlink != nil {
  163. c.downlink.Add(int64(n))
  164. }
  165. if err == nil {
  166. c.updateActivity()
  167. }
  168. return n, err
  169. }
  170. func (c *udpConn) Close() error {
  171. common.Must(c.done.Close())
  172. common.Must(common.Close(c.writer))
  173. return nil
  174. }
  175. func (c *udpConn) RemoteAddr() net.Addr {
  176. return c.remote
  177. }
  178. func (c *udpConn) LocalAddr() net.Addr {
  179. return c.local
  180. }
  181. func (*udpConn) SetDeadline(time.Time) error {
  182. return nil
  183. }
  184. func (*udpConn) SetReadDeadline(time.Time) error {
  185. return nil
  186. }
  187. func (*udpConn) SetWriteDeadline(time.Time) error {
  188. return nil
  189. }
  190. type connID struct {
  191. src net.Destination
  192. dest net.Destination
  193. }
  194. type udpWorker struct {
  195. sync.RWMutex
  196. proxy proxy.Inbound
  197. hub *udp.Hub
  198. address net.Address
  199. port net.Port
  200. tag string
  201. stream *internet.MemoryStreamConfig
  202. dispatcher routing.Dispatcher
  203. uplinkCounter stats.Counter
  204. downlinkCounter stats.Counter
  205. checker *task.Periodic
  206. activeConn map[connID]*udpConn
  207. }
  208. func (w *udpWorker) getConnection(id connID) (*udpConn, bool) {
  209. w.Lock()
  210. defer w.Unlock()
  211. if conn, found := w.activeConn[id]; found && !conn.done.Done() {
  212. return conn, true
  213. }
  214. pReader, pWriter := pipe.New(pipe.DiscardOverflow(), pipe.WithSizeLimit(16*1024))
  215. conn := &udpConn{
  216. reader: pReader,
  217. writer: pWriter,
  218. output: func(b []byte) (int, error) {
  219. return w.hub.WriteTo(b, id.src)
  220. },
  221. remote: &net.UDPAddr{
  222. IP: id.src.Address.IP(),
  223. Port: int(id.src.Port),
  224. },
  225. local: &net.UDPAddr{
  226. IP: w.address.IP(),
  227. Port: int(w.port),
  228. },
  229. done: done.New(),
  230. uplink: w.uplinkCounter,
  231. downlink: w.downlinkCounter,
  232. }
  233. w.activeConn[id] = conn
  234. conn.updateActivity()
  235. return conn, false
  236. }
  237. func (w *udpWorker) callback(b *buf.Buffer, source net.Destination, originalDest net.Destination) {
  238. id := connID{
  239. src: source,
  240. }
  241. if originalDest.IsValid() {
  242. id.dest = originalDest
  243. }
  244. conn, existing := w.getConnection(id)
  245. // payload will be discarded in pipe is full.
  246. conn.writer.WriteMultiBuffer(buf.MultiBuffer{b})
  247. if !existing {
  248. common.Must(w.checker.Start())
  249. go func() {
  250. ctx := context.Background()
  251. sid := session.NewID()
  252. ctx = session.ContextWithID(ctx, sid)
  253. if originalDest.IsValid() {
  254. ctx = session.ContextWithOutbound(ctx, &session.Outbound{
  255. Target: originalDest,
  256. })
  257. }
  258. ctx = session.ContextWithInbound(ctx, &session.Inbound{
  259. Source: source,
  260. Gateway: net.UDPDestination(w.address, w.port),
  261. Tag: w.tag,
  262. })
  263. if err := w.proxy.Process(ctx, net.Network_UDP, conn, w.dispatcher); err != nil {
  264. newError("connection ends").Base(err).WriteToLog(session.ExportIDToError(ctx))
  265. }
  266. conn.Close()
  267. w.removeConn(id)
  268. }()
  269. }
  270. }
  271. func (w *udpWorker) removeConn(id connID) {
  272. w.Lock()
  273. delete(w.activeConn, id)
  274. w.Unlock()
  275. }
  276. func (w *udpWorker) handlePackets() {
  277. receive := w.hub.Receive()
  278. for payload := range receive {
  279. w.callback(payload.Payload, payload.Source, payload.Target)
  280. }
  281. }
  282. func (w *udpWorker) clean() error {
  283. nowSec := time.Now().Unix()
  284. w.Lock()
  285. defer w.Unlock()
  286. if len(w.activeConn) == 0 {
  287. return newError("no more connections. stopping...")
  288. }
  289. for addr, conn := range w.activeConn {
  290. if nowSec-atomic.LoadInt64(&conn.lastActivityTime) > 8 { // TODO Timeout too small
  291. delete(w.activeConn, addr)
  292. conn.Close()
  293. }
  294. }
  295. if len(w.activeConn) == 0 {
  296. w.activeConn = make(map[connID]*udpConn, 16)
  297. }
  298. return nil
  299. }
  300. func (w *udpWorker) Start() error {
  301. w.activeConn = make(map[connID]*udpConn, 16)
  302. ctx := context.Background()
  303. h, err := udp.ListenUDP(ctx, w.address, w.port, w.stream, udp.HubCapacity(256))
  304. if err != nil {
  305. return err
  306. }
  307. w.checker = &task.Periodic{
  308. Interval: time.Second * 16,
  309. Execute: w.clean,
  310. }
  311. w.hub = h
  312. go w.handlePackets()
  313. return nil
  314. }
  315. func (w *udpWorker) Close() error {
  316. w.Lock()
  317. defer w.Unlock()
  318. var errors []interface{}
  319. if w.hub != nil {
  320. if err := w.hub.Close(); err != nil {
  321. errors = append(errors, err)
  322. }
  323. }
  324. if w.checker != nil {
  325. if err := w.checker.Close(); err != nil {
  326. errors = append(errors, err)
  327. }
  328. }
  329. if err := common.Close(w.proxy); err != nil {
  330. errors = append(errors, err)
  331. }
  332. if len(errors) > 0 {
  333. return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
  334. }
  335. return nil
  336. }
  337. func (w *udpWorker) Port() net.Port {
  338. return w.port
  339. }
  340. func (w *udpWorker) Proxy() proxy.Inbound {
  341. return w.proxy
  342. }
  343. type dsWorker struct {
  344. address net.Address
  345. proxy proxy.Inbound
  346. stream *internet.MemoryStreamConfig
  347. tag string
  348. dispatcher routing.Dispatcher
  349. sniffingConfig *proxyman.SniffingConfig
  350. uplinkCounter stats.Counter
  351. downlinkCounter stats.Counter
  352. hub internet.Listener
  353. ctx context.Context
  354. }
  355. func (w *dsWorker) callback(conn internet.Connection) {
  356. ctx, cancel := context.WithCancel(w.ctx)
  357. sid := session.NewID()
  358. ctx = session.ContextWithID(ctx, sid)
  359. ctx = session.ContextWithInbound(ctx, &session.Inbound{
  360. Source: net.DestinationFromAddr(conn.RemoteAddr()),
  361. Gateway: net.UnixDestination(w.address),
  362. Tag: w.tag,
  363. })
  364. content := new(session.Content)
  365. if w.sniffingConfig != nil {
  366. content.SniffingRequest.Enabled = w.sniffingConfig.Enabled
  367. content.SniffingRequest.OverrideDestinationForProtocol = w.sniffingConfig.DestinationOverride
  368. }
  369. ctx = session.ContextWithContent(ctx, content)
  370. if w.uplinkCounter != nil || w.downlinkCounter != nil {
  371. conn = &internet.StatCouterConnection{
  372. Connection: conn,
  373. ReadCounter: w.uplinkCounter,
  374. WriteCounter: w.downlinkCounter,
  375. }
  376. }
  377. if err := w.proxy.Process(ctx, net.Network_UNIX, conn, w.dispatcher); err != nil {
  378. newError("connection ends").Base(err).WriteToLog(session.ExportIDToError(ctx))
  379. }
  380. cancel()
  381. if err := conn.Close(); err != nil {
  382. newError("failed to close connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
  383. }
  384. }
  385. func (w *dsWorker) Proxy() proxy.Inbound {
  386. return w.proxy
  387. }
  388. func (w *dsWorker) Port() net.Port {
  389. return net.Port(0)
  390. }
  391. func (w *dsWorker) Start() error {
  392. ctx := context.Background()
  393. hub, err := internet.ListenUnix(ctx, w.address, w.stream, func(conn internet.Connection) {
  394. go w.callback(conn)
  395. })
  396. if err != nil {
  397. return newError("failed to listen Unix Domain Socket on ", w.address).AtWarning().Base(err)
  398. }
  399. w.hub = hub
  400. return nil
  401. }
  402. func (w *dsWorker) Close() error {
  403. var errors []interface{}
  404. if w.hub != nil {
  405. if err := common.Close(w.hub); err != nil {
  406. errors = append(errors, err)
  407. }
  408. if err := common.Close(w.proxy); err != nil {
  409. errors = append(errors, err)
  410. }
  411. }
  412. if len(errors) > 0 {
  413. return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
  414. }
  415. return nil
  416. }