hub.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package grpc
  2. import (
  3. "context"
  4. "google.golang.org/grpc"
  5. "google.golang.org/grpc/credentials"
  6. "github.com/xtls/xray-core/common"
  7. "github.com/xtls/xray-core/common/net"
  8. "github.com/xtls/xray-core/common/session"
  9. "github.com/xtls/xray-core/transport/internet"
  10. "github.com/xtls/xray-core/transport/internet/grpc/encoding"
  11. "github.com/xtls/xray-core/transport/internet/tls"
  12. )
  13. type Listener struct {
  14. encoding.UnimplementedGRPCServiceServer
  15. ctx context.Context
  16. handler internet.ConnHandler
  17. local net.Addr
  18. config *Config
  19. locker *internet.FileLocker // for unix domain socket
  20. s *grpc.Server
  21. }
  22. func (l Listener) Tun(server encoding.GRPCService_TunServer) error {
  23. tunCtx, cancel := context.WithCancel(l.ctx)
  24. l.handler(encoding.NewHunkConn(server, cancel))
  25. <-tunCtx.Done()
  26. return nil
  27. }
  28. func (l Listener) TunMulti(server encoding.GRPCService_TunMultiServer) error {
  29. tunCtx, cancel := context.WithCancel(l.ctx)
  30. l.handler(encoding.NewMultiHunkConn(server, cancel))
  31. <-tunCtx.Done()
  32. return nil
  33. }
  34. func (l Listener) Close() error {
  35. l.s.Stop()
  36. return nil
  37. }
  38. func (l Listener) Addr() net.Addr {
  39. return l.local
  40. }
  41. func Listen(ctx context.Context, address net.Address, port net.Port, settings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
  42. grpcSettings := settings.ProtocolSettings.(*Config)
  43. var listener *Listener
  44. if port == net.Port(0) { // unix
  45. listener = &Listener{
  46. handler: handler,
  47. local: &net.UnixAddr{
  48. Name: address.Domain(),
  49. Net: "unix",
  50. },
  51. config: grpcSettings,
  52. }
  53. } else { // tcp
  54. listener = &Listener{
  55. handler: handler,
  56. local: &net.TCPAddr{
  57. IP: address.IP(),
  58. Port: int(port),
  59. },
  60. config: grpcSettings,
  61. }
  62. }
  63. listener.ctx = ctx
  64. config := tls.ConfigFromStreamSettings(settings)
  65. var s *grpc.Server
  66. if config == nil {
  67. s = grpc.NewServer()
  68. } else {
  69. s = grpc.NewServer(grpc.Creds(credentials.NewTLS(config.GetTLSConfig(tls.WithNextProto("h2")))))
  70. }
  71. listener.s = s
  72. if settings.SocketSettings != nil && settings.SocketSettings.AcceptProxyProtocol {
  73. newError("accepting PROXY protocol").AtWarning().WriteToLog(session.ExportIDToError(ctx))
  74. }
  75. go func() {
  76. var streamListener net.Listener
  77. var err error
  78. if port == net.Port(0) { // unix
  79. streamListener, err = internet.ListenSystem(ctx, &net.UnixAddr{
  80. Name: address.Domain(),
  81. Net: "unix",
  82. }, settings.SocketSettings)
  83. if err != nil {
  84. newError("failed to listen on ", address).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  85. return
  86. }
  87. locker := ctx.Value(address.Domain())
  88. if locker != nil {
  89. listener.locker = locker.(*internet.FileLocker)
  90. }
  91. } else { // tcp
  92. streamListener, err = internet.ListenSystem(ctx, &net.TCPAddr{
  93. IP: address.IP(),
  94. Port: int(port),
  95. }, settings.SocketSettings)
  96. if err != nil {
  97. newError("failed to listen on ", address, ":", port).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  98. return
  99. }
  100. }
  101. encoding.RegisterGRPCServiceServerX(s, listener, grpcSettings.ServiceName)
  102. if err = s.Serve(streamListener); err != nil {
  103. newError("Listener for gRPC ended").Base(err).WriteToLog()
  104. }
  105. }()
  106. return listener, nil
  107. }
  108. func init() {
  109. common.Must(internet.RegisterTransportListener(protocolName, Listen))
  110. }