hub.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package grpc
  2. import (
  3. "context"
  4. "time"
  5. goreality "github.com/xtls/reality"
  6. "github.com/xtls/xray-core/common"
  7. "github.com/xtls/xray-core/common/net"
  8. "github.com/xtls/xray-core/common/session"
  9. "github.com/xtls/xray-core/transport/internet"
  10. "github.com/xtls/xray-core/transport/internet/grpc/encoding"
  11. "github.com/xtls/xray-core/transport/internet/reality"
  12. "github.com/xtls/xray-core/transport/internet/tls"
  13. "google.golang.org/grpc"
  14. "google.golang.org/grpc/credentials"
  15. "google.golang.org/grpc/keepalive"
  16. )
  17. type Listener struct {
  18. encoding.UnimplementedGRPCServiceServer
  19. ctx context.Context
  20. handler internet.ConnHandler
  21. local net.Addr
  22. config *Config
  23. s *grpc.Server
  24. }
  25. func (l Listener) Tun(server encoding.GRPCService_TunServer) error {
  26. tunCtx, cancel := context.WithCancel(l.ctx)
  27. l.handler(encoding.NewHunkConn(server, cancel))
  28. <-tunCtx.Done()
  29. return nil
  30. }
  31. func (l Listener) TunMulti(server encoding.GRPCService_TunMultiServer) error {
  32. tunCtx, cancel := context.WithCancel(l.ctx)
  33. l.handler(encoding.NewMultiHunkConn(server, cancel))
  34. <-tunCtx.Done()
  35. return nil
  36. }
  37. func (l Listener) Close() error {
  38. l.s.Stop()
  39. return nil
  40. }
  41. func (l Listener) Addr() net.Addr {
  42. return l.local
  43. }
  44. func Listen(ctx context.Context, address net.Address, port net.Port, settings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
  45. grpcSettings := settings.ProtocolSettings.(*Config)
  46. var listener *Listener
  47. if port == net.Port(0) { // unix
  48. listener = &Listener{
  49. handler: handler,
  50. local: &net.UnixAddr{
  51. Name: address.Domain(),
  52. Net: "unix",
  53. },
  54. config: grpcSettings,
  55. }
  56. } else { // tcp
  57. listener = &Listener{
  58. handler: handler,
  59. local: &net.TCPAddr{
  60. IP: address.IP(),
  61. Port: int(port),
  62. },
  63. config: grpcSettings,
  64. }
  65. }
  66. listener.ctx = ctx
  67. config := tls.ConfigFromStreamSettings(settings)
  68. var options []grpc.ServerOption
  69. var s *grpc.Server
  70. if config != nil {
  71. // gRPC server may silently ignore TLS errors
  72. options = append(options, grpc.Creds(credentials.NewTLS(config.GetTLSConfig(tls.WithNextProto("h2")))))
  73. }
  74. if grpcSettings.IdleTimeout > 0 || grpcSettings.HealthCheckTimeout > 0 {
  75. options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{
  76. Time: time.Second * time.Duration(grpcSettings.IdleTimeout),
  77. Timeout: time.Second * time.Duration(grpcSettings.HealthCheckTimeout),
  78. }))
  79. }
  80. s = grpc.NewServer(options...)
  81. listener.s = s
  82. if settings.SocketSettings != nil && settings.SocketSettings.AcceptProxyProtocol {
  83. newError("accepting PROXY protocol").AtWarning().WriteToLog(session.ExportIDToError(ctx))
  84. }
  85. go func() {
  86. var streamListener net.Listener
  87. var err error
  88. if port == net.Port(0) { // unix
  89. streamListener, err = internet.ListenSystem(ctx, &net.UnixAddr{
  90. Name: address.Domain(),
  91. Net: "unix",
  92. }, settings.SocketSettings)
  93. if err != nil {
  94. newError("failed to listen on ", address).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  95. return
  96. }
  97. } else { // tcp
  98. streamListener, err = internet.ListenSystem(ctx, &net.TCPAddr{
  99. IP: address.IP(),
  100. Port: int(port),
  101. }, settings.SocketSettings)
  102. if err != nil {
  103. newError("failed to listen on ", address, ":", port).Base(err).AtError().WriteToLog(session.ExportIDToError(ctx))
  104. return
  105. }
  106. }
  107. newError("gRPC listen for service name `" + grpcSettings.getServiceName() + "` tun `" + grpcSettings.getTunStreamName() + "` multi tun `" + grpcSettings.getTunMultiStreamName() + "`").AtDebug().WriteToLog()
  108. encoding.RegisterGRPCServiceServerX(s, listener, grpcSettings.getServiceName(), grpcSettings.getTunStreamName(), grpcSettings.getTunMultiStreamName())
  109. if config := reality.ConfigFromStreamSettings(settings); config != nil {
  110. streamListener = goreality.NewListener(streamListener, config.GetREALITYConfig())
  111. }
  112. if err = s.Serve(streamListener); err != nil {
  113. newError("Listener for gRPC ended").Base(err).WriteToLog()
  114. }
  115. }()
  116. return listener, nil
  117. }
  118. func init() {
  119. common.Must(internet.RegisterTransportListener(protocolName, Listen))
  120. }