hub.go 5.0 KB


  1. package http
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "os"
  8. "strings"
  9. "time"
  10. "golang.org/x/net/http2"
  11. "golang.org/x/net/http2/h2c"
  12. "github.com/xtls/xray-core/common"
  13. "github.com/xtls/xray-core/common/net"
  14. "github.com/xtls/xray-core/common/net/cnc"
  15. http_proto "github.com/xtls/xray-core/common/protocol/http"
  16. "github.com/xtls/xray-core/common/serial"
  17. "github.com/xtls/xray-core/common/session"
  18. "github.com/xtls/xray-core/common/signal/done"
  19. "github.com/xtls/xray-core/transport/internet"
  20. "github.com/xtls/xray-core/transport/internet/tls"
  21. )
  22. type Listener struct {
  23. server *http.Server
  24. handler internet.ConnHandler
  25. local net.Addr
  26. config *Config
  27. locker *internet.FileLocker // for unix domain socket
  28. }
  29. func (l *Listener) Addr() net.Addr {
  30. return l.local
  31. }
  32. func (l *Listener) Close() error {
  33. if l.locker != nil {
  34. fmt.Fprintln(os.Stderr, "RELEASE LOCK")
  35. l.locker.Release()
  36. }
  37. return l.server.Close()
  38. }
  39. type flushWriter struct {
  40. w io.Writer
  41. d *done.Instance
  42. }
  43. func (fw flushWriter) Write(p []byte) (n int, err error) {
  44. if fw.d.Done() {
  45. return 0, io.ErrClosedPipe
  46. }
  47. n, err = fw.w.Write(p)
  48. if f, ok := fw.w.(http.Flusher); ok {
  49. f.Flush()
  50. }
  51. return
  52. }
  53. func (l *Listener) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
  54. host := request.Host
  55. if !l.config.isValidHost(host) {
  56. writer.WriteHeader(404)
  57. return
  58. }
  59. path := l.config.getNormalizedPath()
  60. if !strings.HasPrefix(request.URL.Path, path) {
  61. writer.WriteHeader(404)
  62. return
  63. }
  64. writer.Header().Set("Cache-Control", "no-store")
  65. writer.WriteHeader(200)
  66. if f, ok := writer.(http.Flusher); ok {
  67. f.Flush()
  68. }
  69. remoteAddr := l.Addr()
  70. dest, err := net.ParseDestination(request.RemoteAddr)
  71. if err != nil {
  72. newError("failed to parse request remote addr: ", request.RemoteAddr).Base(err).WriteToLog()
  73. } else {
  74. remoteAddr = &net.TCPAddr{
  75. IP: dest.Address.IP(),
  76. Port: int(dest.Port),
  77. }
  78. }
  79. forwardedAddrs := http_proto.ParseXForwardedFor(request.Header)
  80. if len(forwardedAddrs) > 0 && forwardedAddrs[0].Family().IsIP() {
  81. remoteAddr = &net.TCPAddr{
  82. IP: forwardedAddrs[0].IP(),
  83. Port: int(0),
  84. }
  85. }
  86. done := done.New()
  87. conn := cnc.NewConnection(
  88. cnc.ConnectionOutput(request.Body),
  89. cnc.ConnectionInput(flushWriter{w: writer, d: done}),
  90. cnc.ConnectionOnClose(common.ChainedClosable{done, request.Body}),
  91. cnc.ConnectionLocalAddr(l.Addr()),
  92. cnc.ConnectionRemoteAddr(remoteAddr),
  93. )
  94. l.handler(conn)
  95. <-done.Wait()
  96. }
  97. func Listen(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
  98. httpSettings := streamSettings.ProtocolSettings.(*Config)
  99. var listener *Listener
  100. if port == net.Port(0) { // unix
  101. listener = &Listener{
  102. handler: handler,
  103. local: &net.UnixAddr{
  104. Name: address.Domain(),
  105. Net: "unix",
  106. },
  107. config: httpSettings,
  108. }
  109. } else { // tcp
  110. listener = &Listener{
  111. handler: handler,
  112. local: &net.TCPAddr{
  113. IP: address.IP(),
  114. Port: int(port),
  115. },
  116. config: httpSettings,
  117. }
  118. }
  119. var server *http.Server
  120. config := tls.ConfigFromStreamSettings(streamSettings)
  121. if config == nil {
  122. h2s := &http2.Server{}
  123. server = &http.Server{
  124. Addr: serial.Concat(address, ":", port),
  125. Handler: h2c.NewHandler(listener, h2s),
  126. ReadHeaderTimeout: time.Second * 4,
  127. }
  128. } else {
  129. server = &http.Server{
  130. Addr: serial.Concat(address, ":", port),
  131. TLSConfig: config.GetTLSConfig(tls.WithNextProto("h2")),
  132. Handler: listener,
  133. ReadHeaderTimeout: time.Second * 4,
  134. }
  135. }
  136. if streamSettings.SocketSettings != nil && streamSettings.SocketSettings.AcceptProxyProtocol {
  137. newError("accepting PROXY protocol").AtWarning().WriteToLog(session.ExportIDToError(ctx))
  138. }
  139. listener.server = server
  140. go func() {
  141. var streamListener net.Listener
  142. var err error
  143. if port == net.Port(0) { // unix
  144. streamListener, err = internet.ListenSystem(ctx, &net.UnixAddr{
  145. Name: address.Domain(),
  146. Net: "unix",
  147. }, streamSettings.SocketSettings)
  148. if err != nil {
  149. newError("failed to listen on ", address).Base(err).WriteToLog(session.ExportIDToError(ctx))
  150. return
  151. }
  152. locker := ctx.Value(address.Domain())
  153. if locker != nil {
  154. listener.locker = locker.(*internet.FileLocker)
  155. }
  156. } else { // tcp
  157. streamListener, err = internet.ListenSystem(ctx, &net.TCPAddr{
  158. IP: address.IP(),
  159. Port: int(port),
  160. }, streamSettings.SocketSettings)
  161. if err != nil {
  162. newError("failed to listen on ", address, ":", port).Base(err).WriteToLog(session.ExportIDToError(ctx))
  163. return
  164. }
  165. }
  166. if config == nil {
  167. err = server.Serve(streamListener)
  168. if err != nil {
  169. newError("stoping serving H2C").Base(err).WriteToLog(session.ExportIDToError(ctx))
  170. }
  171. } else {
  172. err = server.ServeTLS(streamListener, "", "")
  173. if err != nil {
  174. newError("stoping serving TLS").Base(err).WriteToLog(session.ExportIDToError(ctx))
  175. }
  176. }
  177. }()
  178. return listener, nil
  179. }
  180. func init() {
  181. common.Must(internet.RegisterTransportListener(protocolName, Listen))
  182. }