http2_client.go 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "math"
  24. "net"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "golang.org/x/net/http2"
  31. "golang.org/x/net/http2/hpack"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/credentials"
  34. "google.golang.org/grpc/internal"
  35. "google.golang.org/grpc/internal/channelz"
  36. "google.golang.org/grpc/internal/syscall"
  37. "google.golang.org/grpc/keepalive"
  38. "google.golang.org/grpc/metadata"
  39. "google.golang.org/grpc/peer"
  40. "google.golang.org/grpc/stats"
  41. "google.golang.org/grpc/status"
  42. )
  43. // clientConnectionCounter counts the number of connections a client has
  44. // initiated (equal to the number of http2Clients created). Must be accessed
  45. // atomically.
  46. var clientConnectionCounter uint64
  47. // http2Client implements the ClientTransport interface with HTTP2.
  48. type http2Client struct {
  49. lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
  50. ctx context.Context
  51. cancel context.CancelFunc
  52. ctxDone <-chan struct{} // Cache the ctx.Done() chan.
  53. userAgent string
  54. md interface{}
  55. conn net.Conn // underlying communication channel
  56. loopy *loopyWriter
  57. remoteAddr net.Addr
  58. localAddr net.Addr
  59. authInfo credentials.AuthInfo // auth info about the connection
  60. readerDone chan struct{} // sync point to enable testing.
  61. writerDone chan struct{} // sync point to enable testing.
  62. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  63. // that the server sent GoAway on this transport.
  64. goAway chan struct{}
  65. framer *framer
  66. // controlBuf delivers all the control related tasks (e.g., window
  67. // updates, reset streams, and various settings) to the controller.
  68. controlBuf *controlBuffer
  69. fc *trInFlow
  70. // The scheme used: https if TLS is on, http otherwise.
  71. scheme string
  72. isSecure bool
  73. perRPCCreds []credentials.PerRPCCredentials
  74. kp keepalive.ClientParameters
  75. keepaliveEnabled bool
  76. statsHandler stats.Handler
  77. initialWindowSize int32
  78. // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
  79. maxSendHeaderListSize *uint32
  80. bdpEst *bdpEstimator
  81. // onPrefaceReceipt is a callback that client transport calls upon
  82. // receiving server preface to signal that a succefull HTTP2
  83. // connection was established.
  84. onPrefaceReceipt func()
  85. maxConcurrentStreams uint32
  86. streamQuota int64
  87. streamsQuotaAvailable chan struct{}
  88. waitingStreams uint32
  89. nextID uint32
  90. mu sync.Mutex // guard the following variables
  91. state transportState
  92. activeStreams map[uint32]*Stream
  93. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  94. prevGoAwayID uint32
  95. // goAwayReason records the http2.ErrCode and debug data received with the
  96. // GoAway frame.
  97. goAwayReason GoAwayReason
  98. // A condition variable used to signal when the keepalive goroutine should
  99. // go dormant. The condition for dormancy is based on the number of active
  100. // streams and the `PermitWithoutStream` keepalive client parameter. And
  101. // since the number of active streams is guarded by the above mutex, we use
  102. // the same for this condition variable as well.
  103. kpDormancyCond *sync.Cond
  104. // A boolean to track whether the keepalive goroutine is dormant or not.
  105. // This is checked before attempting to signal the above condition
  106. // variable.
  107. kpDormant bool
  108. // Fields below are for channelz metric collection.
  109. channelzID int64 // channelz unique identification number
  110. czData *channelzData
  111. onGoAway func(GoAwayReason)
  112. onClose func()
  113. bufferPool *bufferPool
  114. connectionID uint64
  115. }
  116. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
  117. if fn != nil {
  118. return fn(ctx, addr)
  119. }
  120. return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
  121. }
  122. func isTemporary(err error) bool {
  123. switch err := err.(type) {
  124. case interface {
  125. Temporary() bool
  126. }:
  127. return err.Temporary()
  128. case interface {
  129. Timeout() bool
  130. }:
  131. // Timeouts may be resolved upon retry, and are thus treated as
  132. // temporary.
  133. return err.Timeout()
  134. }
  135. return true
  136. }
  137. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  138. // and starts to receive messages on it. Non-nil error returns if construction
  139. // fails.
  140. func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
  141. scheme := "http"
  142. ctx, cancel := context.WithCancel(ctx)
  143. defer func() {
  144. if err != nil {
  145. cancel()
  146. }
  147. }()
  148. conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
  149. if err != nil {
  150. if opts.FailOnNonTempDialError {
  151. return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
  152. }
  153. return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
  154. }
  155. // Any further errors will close the underlying connection
  156. defer func(conn net.Conn) {
  157. if err != nil {
  158. conn.Close()
  159. }
  160. }(conn)
  161. kp := opts.KeepaliveParams
  162. // Validate keepalive parameters.
  163. if kp.Time == 0 {
  164. kp.Time = defaultClientKeepaliveTime
  165. }
  166. if kp.Timeout == 0 {
  167. kp.Timeout = defaultClientKeepaliveTimeout
  168. }
  169. keepaliveEnabled := false
  170. if kp.Time != infinity {
  171. if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
  172. return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
  173. }
  174. keepaliveEnabled = true
  175. }
  176. var (
  177. isSecure bool
  178. authInfo credentials.AuthInfo
  179. )
  180. transportCreds := opts.TransportCredentials
  181. perRPCCreds := opts.PerRPCCredentials
  182. if b := opts.CredsBundle; b != nil {
  183. if t := b.TransportCredentials(); t != nil {
  184. transportCreds = t
  185. }
  186. if t := b.PerRPCCredentials(); t != nil {
  187. perRPCCreds = append(perRPCCreds, t)
  188. }
  189. }
  190. if transportCreds != nil {
  191. scheme = "https"
  192. conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
  193. if err != nil {
  194. return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
  195. }
  196. isSecure = true
  197. }
  198. dynamicWindow := true
  199. icwz := int32(initialWindowSize)
  200. if opts.InitialConnWindowSize >= defaultWindowSize {
  201. icwz = opts.InitialConnWindowSize
  202. dynamicWindow = false
  203. }
  204. writeBufSize := opts.WriteBufferSize
  205. readBufSize := opts.ReadBufferSize
  206. maxHeaderListSize := defaultClientMaxHeaderListSize
  207. if opts.MaxHeaderListSize != nil {
  208. maxHeaderListSize = *opts.MaxHeaderListSize
  209. }
  210. t := &http2Client{
  211. ctx: ctx,
  212. ctxDone: ctx.Done(), // Cache Done chan.
  213. cancel: cancel,
  214. userAgent: opts.UserAgent,
  215. md: addr.Metadata,
  216. conn: conn,
  217. remoteAddr: conn.RemoteAddr(),
  218. localAddr: conn.LocalAddr(),
  219. authInfo: authInfo,
  220. readerDone: make(chan struct{}),
  221. writerDone: make(chan struct{}),
  222. goAway: make(chan struct{}),
  223. framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
  224. fc: &trInFlow{limit: uint32(icwz)},
  225. scheme: scheme,
  226. activeStreams: make(map[uint32]*Stream),
  227. isSecure: isSecure,
  228. perRPCCreds: perRPCCreds,
  229. kp: kp,
  230. statsHandler: opts.StatsHandler,
  231. initialWindowSize: initialWindowSize,
  232. onPrefaceReceipt: onPrefaceReceipt,
  233. nextID: 1,
  234. maxConcurrentStreams: defaultMaxStreamsClient,
  235. streamQuota: defaultMaxStreamsClient,
  236. streamsQuotaAvailable: make(chan struct{}, 1),
  237. czData: new(channelzData),
  238. onGoAway: onGoAway,
  239. onClose: onClose,
  240. keepaliveEnabled: keepaliveEnabled,
  241. bufferPool: newBufferPool(),
  242. }
  243. t.controlBuf = newControlBuffer(t.ctxDone)
  244. if opts.InitialWindowSize >= defaultWindowSize {
  245. t.initialWindowSize = opts.InitialWindowSize
  246. dynamicWindow = false
  247. }
  248. if dynamicWindow {
  249. t.bdpEst = &bdpEstimator{
  250. bdp: initialWindowSize,
  251. updateFlowControl: t.updateFlowControl,
  252. }
  253. }
  254. if t.statsHandler != nil {
  255. t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
  256. RemoteAddr: t.remoteAddr,
  257. LocalAddr: t.localAddr,
  258. })
  259. connBegin := &stats.ConnBegin{
  260. Client: true,
  261. }
  262. t.statsHandler.HandleConn(t.ctx, connBegin)
  263. }
  264. if channelz.IsOn() {
  265. t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
  266. }
  267. if t.keepaliveEnabled {
  268. t.kpDormancyCond = sync.NewCond(&t.mu)
  269. go t.keepalive()
  270. }
  271. // Start the reader goroutine for incoming message. Each transport has
  272. // a dedicated goroutine which reads HTTP2 frame from network. Then it
  273. // dispatches the frame to the corresponding stream entity.
  274. go t.reader()
  275. // Send connection preface to server.
  276. n, err := t.conn.Write(clientPreface)
  277. if err != nil {
  278. t.Close()
  279. return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
  280. }
  281. if n != len(clientPreface) {
  282. t.Close()
  283. return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  284. }
  285. var ss []http2.Setting
  286. if t.initialWindowSize != defaultWindowSize {
  287. ss = append(ss, http2.Setting{
  288. ID: http2.SettingInitialWindowSize,
  289. Val: uint32(t.initialWindowSize),
  290. })
  291. }
  292. if opts.MaxHeaderListSize != nil {
  293. ss = append(ss, http2.Setting{
  294. ID: http2.SettingMaxHeaderListSize,
  295. Val: *opts.MaxHeaderListSize,
  296. })
  297. }
  298. err = t.framer.fr.WriteSettings(ss...)
  299. if err != nil {
  300. t.Close()
  301. return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  302. }
  303. // Adjust the connection flow control window if needed.
  304. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  305. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  306. t.Close()
  307. return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  308. }
  309. }
  310. t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
  311. if err := t.framer.writer.Flush(); err != nil {
  312. return nil, err
  313. }
  314. go func() {
  315. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
  316. err := t.loopy.run()
  317. if err != nil {
  318. errorf("transport: loopyWriter.run returning. Err: %v", err)
  319. }
  320. // If it's a connection error, let reader goroutine handle it
  321. // since there might be data in the buffers.
  322. if _, ok := err.(net.Error); !ok {
  323. t.conn.Close()
  324. }
  325. close(t.writerDone)
  326. }()
  327. return t, nil
  328. }
  329. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  330. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  331. s := &Stream{
  332. ct: t,
  333. done: make(chan struct{}),
  334. method: callHdr.Method,
  335. sendCompress: callHdr.SendCompress,
  336. buf: newRecvBuffer(),
  337. headerChan: make(chan struct{}),
  338. contentSubtype: callHdr.ContentSubtype,
  339. }
  340. s.wq = newWriteQuota(defaultWriteQuota, s.done)
  341. s.requestRead = func(n int) {
  342. t.adjustWindow(s, uint32(n))
  343. }
  344. // The client side stream context should have exactly the same life cycle with the user provided context.
  345. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  346. // So we use the original context here instead of creating a copy.
  347. s.ctx = ctx
  348. s.trReader = &transportReader{
  349. reader: &recvBufferReader{
  350. ctx: s.ctx,
  351. ctxDone: s.ctx.Done(),
  352. recv: s.buf,
  353. closeStream: func(err error) {
  354. t.CloseStream(s, err)
  355. },
  356. freeBuffer: t.bufferPool.put,
  357. },
  358. windowHandler: func(n int) {
  359. t.updateWindow(s, uint32(n))
  360. },
  361. }
  362. return s
  363. }
  364. func (t *http2Client) getPeer() *peer.Peer {
  365. return &peer.Peer{
  366. Addr: t.remoteAddr,
  367. AuthInfo: t.authInfo,
  368. }
  369. }
  370. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  371. aud := t.createAudience(callHdr)
  372. ri := credentials.RequestInfo{
  373. Method: callHdr.Method,
  374. AuthInfo: t.authInfo,
  375. }
  376. ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
  377. authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
  378. if err != nil {
  379. return nil, err
  380. }
  381. callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
  382. if err != nil {
  383. return nil, err
  384. }
  385. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  386. // first and create a slice of that exact size.
  387. // Make the slice of certain predictable size to reduce allocations made by append.
  388. hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
  389. hfLen += len(authData) + len(callAuthData)
  390. headerFields := make([]hpack.HeaderField, 0, hfLen)
  391. headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
  392. headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  393. headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  394. headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  395. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
  396. headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  397. headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
  398. if callHdr.PreviousAttempts > 0 {
  399. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
  400. }
  401. if callHdr.SendCompress != "" {
  402. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  403. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
  404. }
  405. if dl, ok := ctx.Deadline(); ok {
  406. // Send out timeout regardless its value. The server can detect timeout context by itself.
  407. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
  408. timeout := time.Until(dl)
  409. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
  410. }
  411. for k, v := range authData {
  412. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  413. }
  414. for k, v := range callAuthData {
  415. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  416. }
  417. if b := stats.OutgoingTags(ctx); b != nil {
  418. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
  419. }
  420. if b := stats.OutgoingTrace(ctx); b != nil {
  421. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
  422. }
  423. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  424. var k string
  425. for k, vv := range md {
  426. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  427. if isReservedHeader(k) {
  428. continue
  429. }
  430. for _, v := range vv {
  431. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  432. }
  433. }
  434. for _, vv := range added {
  435. for i, v := range vv {
  436. if i%2 == 0 {
  437. k = v
  438. continue
  439. }
  440. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  441. if isReservedHeader(k) {
  442. continue
  443. }
  444. headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
  445. }
  446. }
  447. }
  448. if md, ok := t.md.(*metadata.MD); ok {
  449. for k, vv := range *md {
  450. if isReservedHeader(k) {
  451. continue
  452. }
  453. for _, v := range vv {
  454. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  455. }
  456. }
  457. }
  458. return headerFields, nil
  459. }
  460. func (t *http2Client) createAudience(callHdr *CallHdr) string {
  461. // Create an audience string only if needed.
  462. if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
  463. return ""
  464. }
  465. // Construct URI required to get auth request metadata.
  466. // Omit port if it is the default one.
  467. host := strings.TrimSuffix(callHdr.Host, ":443")
  468. pos := strings.LastIndex(callHdr.Method, "/")
  469. if pos == -1 {
  470. pos = len(callHdr.Method)
  471. }
  472. return "https://" + host + callHdr.Method[:pos]
  473. }
  474. func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
  475. if len(t.perRPCCreds) == 0 {
  476. return nil, nil
  477. }
  478. authData := map[string]string{}
  479. for _, c := range t.perRPCCreds {
  480. data, err := c.GetRequestMetadata(ctx, audience)
  481. if err != nil {
  482. if _, ok := status.FromError(err); ok {
  483. return nil, err
  484. }
  485. return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
  486. }
  487. for k, v := range data {
  488. // Capital header names are illegal in HTTP/2.
  489. k = strings.ToLower(k)
  490. authData[k] = v
  491. }
  492. }
  493. return authData, nil
  494. }
  495. func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
  496. var callAuthData map[string]string
  497. // Check if credentials.PerRPCCredentials were provided via call options.
  498. // Note: if these credentials are provided both via dial options and call
  499. // options, then both sets of credentials will be applied.
  500. if callCreds := callHdr.Creds; callCreds != nil {
  501. if !t.isSecure && callCreds.RequireTransportSecurity() {
  502. return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
  503. }
  504. data, err := callCreds.GetRequestMetadata(ctx, audience)
  505. if err != nil {
  506. return nil, status.Errorf(codes.Internal, "transport: %v", err)
  507. }
  508. callAuthData = make(map[string]string, len(data))
  509. for k, v := range data {
  510. // Capital header names are illegal in HTTP/2
  511. k = strings.ToLower(k)
  512. callAuthData[k] = v
  513. }
  514. }
  515. return callAuthData, nil
  516. }
  517. // NewStream creates a stream and registers it into the transport as "active"
  518. // streams.
  519. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  520. ctx = peer.NewContext(ctx, t.getPeer())
  521. headerFields, err := t.createHeaderFields(ctx, callHdr)
  522. if err != nil {
  523. return nil, err
  524. }
  525. s := t.newStream(ctx, callHdr)
  526. cleanup := func(err error) {
  527. if s.swapState(streamDone) == streamDone {
  528. // If it was already done, return.
  529. return
  530. }
  531. // The stream was unprocessed by the server.
  532. atomic.StoreUint32(&s.unprocessed, 1)
  533. s.write(recvMsg{err: err})
  534. close(s.done)
  535. // If headerChan isn't closed, then close it.
  536. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  537. close(s.headerChan)
  538. }
  539. }
  540. hdr := &headerFrame{
  541. hf: headerFields,
  542. endStream: false,
  543. initStream: func(id uint32) error {
  544. t.mu.Lock()
  545. if state := t.state; state != reachable {
  546. t.mu.Unlock()
  547. // Do a quick cleanup.
  548. err := error(errStreamDrain)
  549. if state == closing {
  550. err = ErrConnClosing
  551. }
  552. cleanup(err)
  553. return err
  554. }
  555. t.activeStreams[id] = s
  556. if channelz.IsOn() {
  557. atomic.AddInt64(&t.czData.streamsStarted, 1)
  558. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  559. }
  560. // If the keepalive goroutine has gone dormant, wake it up.
  561. if t.kpDormant {
  562. t.kpDormancyCond.Signal()
  563. }
  564. t.mu.Unlock()
  565. return nil
  566. },
  567. onOrphaned: cleanup,
  568. wq: s.wq,
  569. }
  570. firstTry := true
  571. var ch chan struct{}
  572. checkForStreamQuota := func(it interface{}) bool {
  573. if t.streamQuota <= 0 { // Can go negative if server decreases it.
  574. if firstTry {
  575. t.waitingStreams++
  576. }
  577. ch = t.streamsQuotaAvailable
  578. return false
  579. }
  580. if !firstTry {
  581. t.waitingStreams--
  582. }
  583. t.streamQuota--
  584. h := it.(*headerFrame)
  585. h.streamID = t.nextID
  586. t.nextID += 2
  587. s.id = h.streamID
  588. s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
  589. if t.streamQuota > 0 && t.waitingStreams > 0 {
  590. select {
  591. case t.streamsQuotaAvailable <- struct{}{}:
  592. default:
  593. }
  594. }
  595. return true
  596. }
  597. var hdrListSizeErr error
  598. checkForHeaderListSize := func(it interface{}) bool {
  599. if t.maxSendHeaderListSize == nil {
  600. return true
  601. }
  602. hdrFrame := it.(*headerFrame)
  603. var sz int64
  604. for _, f := range hdrFrame.hf {
  605. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  606. hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
  607. return false
  608. }
  609. }
  610. return true
  611. }
  612. for {
  613. success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
  614. if !checkForStreamQuota(it) {
  615. return false
  616. }
  617. if !checkForHeaderListSize(it) {
  618. return false
  619. }
  620. return true
  621. }, hdr)
  622. if err != nil {
  623. return nil, err
  624. }
  625. if success {
  626. break
  627. }
  628. if hdrListSizeErr != nil {
  629. return nil, hdrListSizeErr
  630. }
  631. firstTry = false
  632. select {
  633. case <-ch:
  634. case <-s.ctx.Done():
  635. return nil, ContextErr(s.ctx.Err())
  636. case <-t.goAway:
  637. return nil, errStreamDrain
  638. case <-t.ctx.Done():
  639. return nil, ErrConnClosing
  640. }
  641. }
  642. if t.statsHandler != nil {
  643. header, ok := metadata.FromOutgoingContext(ctx)
  644. if ok {
  645. header.Set("user-agent", t.userAgent)
  646. } else {
  647. header = metadata.Pairs("user-agent", t.userAgent)
  648. }
  649. outHeader := &stats.OutHeader{
  650. Client: true,
  651. FullMethod: callHdr.Method,
  652. RemoteAddr: t.remoteAddr,
  653. LocalAddr: t.localAddr,
  654. Compression: callHdr.SendCompress,
  655. Header: header,
  656. }
  657. t.statsHandler.HandleRPC(s.ctx, outHeader)
  658. }
  659. return s, nil
  660. }
  661. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  662. // This must not be executed in reader's goroutine.
  663. func (t *http2Client) CloseStream(s *Stream, err error) {
  664. var (
  665. rst bool
  666. rstCode http2.ErrCode
  667. )
  668. if err != nil {
  669. rst = true
  670. rstCode = http2.ErrCodeCancel
  671. }
  672. t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
  673. }
  674. func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
  675. // Set stream status to done.
  676. if s.swapState(streamDone) == streamDone {
  677. // If it was already done, return. If multiple closeStream calls
  678. // happen simultaneously, wait for the first to finish.
  679. <-s.done
  680. return
  681. }
  682. // status and trailers can be updated here without any synchronization because the stream goroutine will
  683. // only read it after it sees an io.EOF error from read or write and we'll write those errors
  684. // only after updating this.
  685. s.status = st
  686. if len(mdata) > 0 {
  687. s.trailer = mdata
  688. }
  689. if err != nil {
  690. // This will unblock reads eventually.
  691. s.write(recvMsg{err: err})
  692. }
  693. // If headerChan isn't closed, then close it.
  694. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  695. s.noHeaders = true
  696. close(s.headerChan)
  697. }
  698. cleanup := &cleanupStream{
  699. streamID: s.id,
  700. onWrite: func() {
  701. t.mu.Lock()
  702. if t.activeStreams != nil {
  703. delete(t.activeStreams, s.id)
  704. }
  705. t.mu.Unlock()
  706. if channelz.IsOn() {
  707. if eosReceived {
  708. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  709. } else {
  710. atomic.AddInt64(&t.czData.streamsFailed, 1)
  711. }
  712. }
  713. },
  714. rst: rst,
  715. rstCode: rstCode,
  716. }
  717. addBackStreamQuota := func(interface{}) bool {
  718. t.streamQuota++
  719. if t.streamQuota > 0 && t.waitingStreams > 0 {
  720. select {
  721. case t.streamsQuotaAvailable <- struct{}{}:
  722. default:
  723. }
  724. }
  725. return true
  726. }
  727. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
  728. // This will unblock write.
  729. close(s.done)
  730. }
  731. // Close kicks off the shutdown process of the transport. This should be called
  732. // only once on a transport. Once it is called, the transport should not be
  733. // accessed any more.
  734. //
  735. // This method blocks until the addrConn that initiated this transport is
  736. // re-connected. This happens because t.onClose() begins reconnect logic at the
  737. // addrConn level and blocks until the addrConn is successfully connected.
  738. func (t *http2Client) Close() error {
  739. t.mu.Lock()
  740. // Make sure we only Close once.
  741. if t.state == closing {
  742. t.mu.Unlock()
  743. return nil
  744. }
  745. // Call t.onClose before setting the state to closing to prevent the client
  746. // from attempting to create new streams ASAP.
  747. t.onClose()
  748. t.state = closing
  749. streams := t.activeStreams
  750. t.activeStreams = nil
  751. if t.kpDormant {
  752. // If the keepalive goroutine is blocked on this condition variable, we
  753. // should unblock it so that the goroutine eventually exits.
  754. t.kpDormancyCond.Signal()
  755. }
  756. t.mu.Unlock()
  757. t.controlBuf.finish()
  758. t.cancel()
  759. err := t.conn.Close()
  760. if channelz.IsOn() {
  761. channelz.RemoveEntry(t.channelzID)
  762. }
  763. // Notify all active streams.
  764. for _, s := range streams {
  765. t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
  766. }
  767. if t.statsHandler != nil {
  768. connEnd := &stats.ConnEnd{
  769. Client: true,
  770. }
  771. t.statsHandler.HandleConn(t.ctx, connEnd)
  772. }
  773. return err
  774. }
  775. // GracefulClose sets the state to draining, which prevents new streams from
  776. // being created and causes the transport to be closed when the last active
  777. // stream is closed. If there are no active streams, the transport is closed
  778. // immediately. This does nothing if the transport is already draining or
  779. // closing.
  780. func (t *http2Client) GracefulClose() {
  781. t.mu.Lock()
  782. // Make sure we move to draining only from active.
  783. if t.state == draining || t.state == closing {
  784. t.mu.Unlock()
  785. return
  786. }
  787. t.state = draining
  788. active := len(t.activeStreams)
  789. t.mu.Unlock()
  790. if active == 0 {
  791. t.Close()
  792. return
  793. }
  794. t.controlBuf.put(&incomingGoAway{})
  795. }
  796. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  797. // should proceed only if Write returns nil.
  798. func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  799. if opts.Last {
  800. // If it's the last message, update stream state.
  801. if !s.compareAndSwapState(streamActive, streamWriteDone) {
  802. return errStreamDone
  803. }
  804. } else if s.getState() != streamActive {
  805. return errStreamDone
  806. }
  807. df := &dataFrame{
  808. streamID: s.id,
  809. endStream: opts.Last,
  810. }
  811. if hdr != nil || data != nil { // If it's not an empty data frame.
  812. // Add some data to grpc message header so that we can equally
  813. // distribute bytes across frames.
  814. emptyLen := http2MaxFrameLen - len(hdr)
  815. if emptyLen > len(data) {
  816. emptyLen = len(data)
  817. }
  818. hdr = append(hdr, data[:emptyLen]...)
  819. data = data[emptyLen:]
  820. df.h, df.d = hdr, data
  821. // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
  822. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  823. return err
  824. }
  825. }
  826. return t.controlBuf.put(df)
  827. }
  828. func (t *http2Client) getStream(f http2.Frame) *Stream {
  829. t.mu.Lock()
  830. s := t.activeStreams[f.Header().StreamID]
  831. t.mu.Unlock()
  832. return s
  833. }
  834. // adjustWindow sends out extra window update over the initial window size
  835. // of stream if the application is requesting data larger in size than
  836. // the window.
  837. func (t *http2Client) adjustWindow(s *Stream, n uint32) {
  838. if w := s.fc.maybeAdjust(n); w > 0 {
  839. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  840. }
  841. }
  842. // updateWindow adjusts the inbound quota for the stream.
  843. // Window updates will be sent out when the cumulative quota
  844. // exceeds the corresponding threshold.
  845. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  846. if w := s.fc.onRead(n); w > 0 {
  847. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  848. }
  849. }
  850. // updateFlowControl updates the incoming flow control windows
  851. // for the transport and the stream based on the current bdp
  852. // estimation.
  853. func (t *http2Client) updateFlowControl(n uint32) {
  854. t.mu.Lock()
  855. for _, s := range t.activeStreams {
  856. s.fc.newLimit(n)
  857. }
  858. t.mu.Unlock()
  859. updateIWS := func(interface{}) bool {
  860. t.initialWindowSize = int32(n)
  861. return true
  862. }
  863. t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
  864. t.controlBuf.put(&outgoingSettings{
  865. ss: []http2.Setting{
  866. {
  867. ID: http2.SettingInitialWindowSize,
  868. Val: n,
  869. },
  870. },
  871. })
  872. }
  873. func (t *http2Client) handleData(f *http2.DataFrame) {
  874. size := f.Header().Length
  875. var sendBDPPing bool
  876. if t.bdpEst != nil {
  877. sendBDPPing = t.bdpEst.add(size)
  878. }
  879. // Decouple connection's flow control from application's read.
  880. // An update on connection's flow control should not depend on
  881. // whether user application has read the data or not. Such a
  882. // restriction is already imposed on the stream's flow control,
  883. // and therefore the sender will be blocked anyways.
  884. // Decoupling the connection flow control will prevent other
  885. // active(fast) streams from starving in presence of slow or
  886. // inactive streams.
  887. //
  888. if w := t.fc.onData(size); w > 0 {
  889. t.controlBuf.put(&outgoingWindowUpdate{
  890. streamID: 0,
  891. increment: w,
  892. })
  893. }
  894. if sendBDPPing {
  895. // Avoid excessive ping detection (e.g. in an L7 proxy)
  896. // by sending a window update prior to the BDP ping.
  897. if w := t.fc.reset(); w > 0 {
  898. t.controlBuf.put(&outgoingWindowUpdate{
  899. streamID: 0,
  900. increment: w,
  901. })
  902. }
  903. t.controlBuf.put(bdpPing)
  904. }
  905. // Select the right stream to dispatch.
  906. s := t.getStream(f)
  907. if s == nil {
  908. return
  909. }
  910. if size > 0 {
  911. if err := s.fc.onData(size); err != nil {
  912. t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
  913. return
  914. }
  915. if f.Header().Flags.Has(http2.FlagDataPadded) {
  916. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  917. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  918. }
  919. }
  920. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  921. // guarantee f.Data() is consumed before the arrival of next frame.
  922. // Can this copy be eliminated?
  923. if len(f.Data()) > 0 {
  924. buffer := t.bufferPool.get()
  925. buffer.Reset()
  926. buffer.Write(f.Data())
  927. s.write(recvMsg{buffer: buffer})
  928. }
  929. }
  930. // The server has closed the stream without sending trailers. Record that
  931. // the read direction is closed, and set the status appropriately.
  932. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
  933. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
  934. }
  935. }
  936. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  937. s := t.getStream(f)
  938. if s == nil {
  939. return
  940. }
  941. if f.ErrCode == http2.ErrCodeRefusedStream {
  942. // The stream was unprocessed by the server.
  943. atomic.StoreUint32(&s.unprocessed, 1)
  944. }
  945. statusCode, ok := http2ErrConvTab[f.ErrCode]
  946. if !ok {
  947. warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
  948. statusCode = codes.Unknown
  949. }
  950. if statusCode == codes.Canceled {
  951. if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
  952. // Our deadline was already exceeded, and that was likely the cause
  953. // of this cancelation. Alter the status code accordingly.
  954. statusCode = codes.DeadlineExceeded
  955. }
  956. }
  957. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
  958. }
  959. func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
  960. if f.IsAck() {
  961. return
  962. }
  963. var maxStreams *uint32
  964. var ss []http2.Setting
  965. var updateFuncs []func()
  966. f.ForeachSetting(func(s http2.Setting) error {
  967. switch s.ID {
  968. case http2.SettingMaxConcurrentStreams:
  969. maxStreams = new(uint32)
  970. *maxStreams = s.Val
  971. case http2.SettingMaxHeaderListSize:
  972. updateFuncs = append(updateFuncs, func() {
  973. t.maxSendHeaderListSize = new(uint32)
  974. *t.maxSendHeaderListSize = s.Val
  975. })
  976. default:
  977. ss = append(ss, s)
  978. }
  979. return nil
  980. })
  981. if isFirst && maxStreams == nil {
  982. maxStreams = new(uint32)
  983. *maxStreams = math.MaxUint32
  984. }
  985. sf := &incomingSettings{
  986. ss: ss,
  987. }
  988. if maxStreams != nil {
  989. updateStreamQuota := func() {
  990. delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
  991. t.maxConcurrentStreams = *maxStreams
  992. t.streamQuota += delta
  993. if delta > 0 && t.waitingStreams > 0 {
  994. close(t.streamsQuotaAvailable) // wake all of them up.
  995. t.streamsQuotaAvailable = make(chan struct{}, 1)
  996. }
  997. }
  998. updateFuncs = append(updateFuncs, updateStreamQuota)
  999. }
  1000. t.controlBuf.executeAndPut(func(interface{}) bool {
  1001. for _, f := range updateFuncs {
  1002. f()
  1003. }
  1004. return true
  1005. }, sf)
  1006. }
  1007. func (t *http2Client) handlePing(f *http2.PingFrame) {
  1008. if f.IsAck() {
  1009. // Maybe it's a BDP ping.
  1010. if t.bdpEst != nil {
  1011. t.bdpEst.calculate(f.Data)
  1012. }
  1013. return
  1014. }
  1015. pingAck := &ping{ack: true}
  1016. copy(pingAck.data[:], f.Data[:])
  1017. t.controlBuf.put(pingAck)
  1018. }
  1019. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  1020. t.mu.Lock()
  1021. if t.state == closing {
  1022. t.mu.Unlock()
  1023. return
  1024. }
  1025. if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
  1026. infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
  1027. }
  1028. id := f.LastStreamID
  1029. if id > 0 && id%2 != 1 {
  1030. t.mu.Unlock()
  1031. t.Close()
  1032. return
  1033. }
  1034. // A client can receive multiple GoAways from the server (see
  1035. // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
  1036. // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
  1037. // sent after an RTT delay with the ID of the last stream the server will
  1038. // process.
  1039. //
  1040. // Therefore, when we get the first GoAway we don't necessarily close any
  1041. // streams. While in case of second GoAway we close all streams created after
  1042. // the GoAwayId. This way streams that were in-flight while the GoAway from
  1043. // server was being sent don't get killed.
  1044. select {
  1045. case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
  1046. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
  1047. if id > t.prevGoAwayID {
  1048. t.mu.Unlock()
  1049. t.Close()
  1050. return
  1051. }
  1052. default:
  1053. t.setGoAwayReason(f)
  1054. close(t.goAway)
  1055. t.controlBuf.put(&incomingGoAway{})
  1056. // Notify the clientconn about the GOAWAY before we set the state to
  1057. // draining, to allow the client to stop attempting to create streams
  1058. // before disallowing new streams on this connection.
  1059. t.onGoAway(t.goAwayReason)
  1060. t.state = draining
  1061. }
  1062. // All streams with IDs greater than the GoAwayId
  1063. // and smaller than the previous GoAway ID should be killed.
  1064. upperLimit := t.prevGoAwayID
  1065. if upperLimit == 0 { // This is the first GoAway Frame.
  1066. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
  1067. }
  1068. for streamID, stream := range t.activeStreams {
  1069. if streamID > id && streamID <= upperLimit {
  1070. // The stream was unprocessed by the server.
  1071. atomic.StoreUint32(&stream.unprocessed, 1)
  1072. t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
  1073. }
  1074. }
  1075. t.prevGoAwayID = id
  1076. active := len(t.activeStreams)
  1077. t.mu.Unlock()
  1078. if active == 0 {
  1079. t.Close()
  1080. }
  1081. }
  1082. // setGoAwayReason sets the value of t.goAwayReason based
  1083. // on the GoAway frame received.
  1084. // It expects a lock on transport's mutext to be held by
  1085. // the caller.
  1086. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
  1087. t.goAwayReason = GoAwayNoReason
  1088. switch f.ErrCode {
  1089. case http2.ErrCodeEnhanceYourCalm:
  1090. if string(f.DebugData()) == "too_many_pings" {
  1091. t.goAwayReason = GoAwayTooManyPings
  1092. }
  1093. }
  1094. }
  1095. func (t *http2Client) GetGoAwayReason() GoAwayReason {
  1096. t.mu.Lock()
  1097. defer t.mu.Unlock()
  1098. return t.goAwayReason
  1099. }
  1100. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  1101. t.controlBuf.put(&incomingWindowUpdate{
  1102. streamID: f.Header().StreamID,
  1103. increment: f.Increment,
  1104. })
  1105. }
  1106. // operateHeaders takes action on the decoded headers.
  1107. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  1108. s := t.getStream(frame)
  1109. if s == nil {
  1110. return
  1111. }
  1112. endStream := frame.StreamEnded()
  1113. atomic.StoreUint32(&s.bytesReceived, 1)
  1114. initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
  1115. if !initialHeader && !endStream {
  1116. // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
  1117. st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
  1118. t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
  1119. return
  1120. }
  1121. state := &decodeState{}
  1122. // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
  1123. state.data.isGRPC = !initialHeader
  1124. if err := state.decodeHeader(frame); err != nil {
  1125. t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
  1126. return
  1127. }
  1128. isHeader := false
  1129. defer func() {
  1130. if t.statsHandler != nil {
  1131. if isHeader {
  1132. inHeader := &stats.InHeader{
  1133. Client: true,
  1134. WireLength: int(frame.Header().Length),
  1135. Header: s.header.Copy(),
  1136. }
  1137. t.statsHandler.HandleRPC(s.ctx, inHeader)
  1138. } else {
  1139. inTrailer := &stats.InTrailer{
  1140. Client: true,
  1141. WireLength: int(frame.Header().Length),
  1142. Trailer: s.trailer.Copy(),
  1143. }
  1144. t.statsHandler.HandleRPC(s.ctx, inTrailer)
  1145. }
  1146. }
  1147. }()
  1148. // If headerChan hasn't been closed yet
  1149. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  1150. s.headerValid = true
  1151. if !endStream {
  1152. // HEADERS frame block carries a Response-Headers.
  1153. isHeader = true
  1154. // These values can be set without any synchronization because
  1155. // stream goroutine will read it only after seeing a closed
  1156. // headerChan which we'll close after setting this.
  1157. s.recvCompress = state.data.encoding
  1158. if len(state.data.mdata) > 0 {
  1159. s.header = state.data.mdata
  1160. }
  1161. } else {
  1162. // HEADERS frame block carries a Trailers-Only.
  1163. s.noHeaders = true
  1164. }
  1165. close(s.headerChan)
  1166. }
  1167. if !endStream {
  1168. return
  1169. }
  1170. // if client received END_STREAM from server while stream was still active, send RST_STREAM
  1171. rst := s.getState() == streamActive
  1172. t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
  1173. }
  1174. // reader runs as a separate goroutine in charge of reading data from network
  1175. // connection.
  1176. //
  1177. // TODO(zhaoq): currently one reader per transport. Investigate whether this is
  1178. // optimal.
  1179. // TODO(zhaoq): Check the validity of the incoming frame sequence.
  1180. func (t *http2Client) reader() {
  1181. defer close(t.readerDone)
  1182. // Check the validity of server preface.
  1183. frame, err := t.framer.fr.ReadFrame()
  1184. if err != nil {
  1185. t.Close() // this kicks off resetTransport, so must be last before return
  1186. return
  1187. }
  1188. t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
  1189. if t.keepaliveEnabled {
  1190. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  1191. }
  1192. sf, ok := frame.(*http2.SettingsFrame)
  1193. if !ok {
  1194. t.Close() // this kicks off resetTransport, so must be last before return
  1195. return
  1196. }
  1197. t.onPrefaceReceipt()
  1198. t.handleSettings(sf, true)
  1199. // loop to keep reading incoming messages on this transport.
  1200. for {
  1201. t.controlBuf.throttle()
  1202. frame, err := t.framer.fr.ReadFrame()
  1203. if t.keepaliveEnabled {
  1204. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  1205. }
  1206. if err != nil {
  1207. // Abort an active stream if the http2.Framer returns a
  1208. // http2.StreamError. This can happen only if the server's response
  1209. // is malformed http2.
  1210. if se, ok := err.(http2.StreamError); ok {
  1211. t.mu.Lock()
  1212. s := t.activeStreams[se.StreamID]
  1213. t.mu.Unlock()
  1214. if s != nil {
  1215. // use error detail to provide better err message
  1216. code := http2ErrConvTab[se.Code]
  1217. msg := t.framer.fr.ErrorDetail().Error()
  1218. t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
  1219. }
  1220. continue
  1221. } else {
  1222. // Transport error.
  1223. t.Close()
  1224. return
  1225. }
  1226. }
  1227. switch frame := frame.(type) {
  1228. case *http2.MetaHeadersFrame:
  1229. t.operateHeaders(frame)
  1230. case *http2.DataFrame:
  1231. t.handleData(frame)
  1232. case *http2.RSTStreamFrame:
  1233. t.handleRSTStream(frame)
  1234. case *http2.SettingsFrame:
  1235. t.handleSettings(frame, false)
  1236. case *http2.PingFrame:
  1237. t.handlePing(frame)
  1238. case *http2.GoAwayFrame:
  1239. t.handleGoAway(frame)
  1240. case *http2.WindowUpdateFrame:
  1241. t.handleWindowUpdate(frame)
  1242. default:
  1243. errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
  1244. }
  1245. }
  1246. }
  1247. func minTime(a, b time.Duration) time.Duration {
  1248. if a < b {
  1249. return a
  1250. }
  1251. return b
  1252. }
  1253. // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
  1254. func (t *http2Client) keepalive() {
  1255. p := &ping{data: [8]byte{}}
  1256. // True iff a ping has been sent, and no data has been received since then.
  1257. outstandingPing := false
  1258. // Amount of time remaining before which we should receive an ACK for the
  1259. // last sent ping.
  1260. timeoutLeft := time.Duration(0)
  1261. // Records the last value of t.lastRead before we go block on the timer.
  1262. // This is required to check for read activity since then.
  1263. prevNano := time.Now().UnixNano()
  1264. timer := time.NewTimer(t.kp.Time)
  1265. for {
  1266. select {
  1267. case <-timer.C:
  1268. lastRead := atomic.LoadInt64(&t.lastRead)
  1269. if lastRead > prevNano {
  1270. // There has been read activity since the last time we were here.
  1271. outstandingPing = false
  1272. // Next timer should fire at kp.Time seconds from lastRead time.
  1273. timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  1274. prevNano = lastRead
  1275. continue
  1276. }
  1277. if outstandingPing && timeoutLeft <= 0 {
  1278. t.Close()
  1279. return
  1280. }
  1281. t.mu.Lock()
  1282. if t.state == closing {
  1283. // If the transport is closing, we should exit from the
  1284. // keepalive goroutine here. If not, we could have a race
  1285. // between the call to Signal() from Close() and the call to
  1286. // Wait() here, whereby the keepalive goroutine ends up
  1287. // blocking on the condition variable which will never be
  1288. // signalled again.
  1289. t.mu.Unlock()
  1290. return
  1291. }
  1292. if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
  1293. // If a ping was sent out previously (because there were active
  1294. // streams at that point) which wasn't acked and its timeout
  1295. // hadn't fired, but we got here and are about to go dormant,
  1296. // we should make sure that we unconditionally send a ping once
  1297. // we awaken.
  1298. outstandingPing = false
  1299. t.kpDormant = true
  1300. t.kpDormancyCond.Wait()
  1301. }
  1302. t.kpDormant = false
  1303. t.mu.Unlock()
  1304. // We get here either because we were dormant and a new stream was
  1305. // created which unblocked the Wait() call, or because the
  1306. // keepalive timer expired. In both cases, we need to send a ping.
  1307. if !outstandingPing {
  1308. if channelz.IsOn() {
  1309. atomic.AddInt64(&t.czData.kpCount, 1)
  1310. }
  1311. t.controlBuf.put(p)
  1312. timeoutLeft = t.kp.Timeout
  1313. outstandingPing = true
  1314. }
  1315. // The amount of time to sleep here is the minimum of kp.Time and
  1316. // timeoutLeft. This will ensure that we wait only for kp.Time
  1317. // before sending out the next ping (for cases where the ping is
  1318. // acked).
  1319. sleepDuration := minTime(t.kp.Time, timeoutLeft)
  1320. timeoutLeft -= sleepDuration
  1321. timer.Reset(sleepDuration)
  1322. case <-t.ctx.Done():
  1323. if !timer.Stop() {
  1324. <-timer.C
  1325. }
  1326. return
  1327. }
  1328. }
  1329. }
  1330. func (t *http2Client) Error() <-chan struct{} {
  1331. return t.ctx.Done()
  1332. }
  1333. func (t *http2Client) GoAway() <-chan struct{} {
  1334. return t.goAway
  1335. }
  1336. func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
  1337. s := channelz.SocketInternalMetric{
  1338. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1339. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1340. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1341. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1342. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1343. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1344. LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1345. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1346. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1347. LocalFlowControlWindow: int64(t.fc.getSize()),
  1348. SocketOptions: channelz.GetSocketOption(t.conn),
  1349. LocalAddr: t.localAddr,
  1350. RemoteAddr: t.remoteAddr,
  1351. // RemoteName :
  1352. }
  1353. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1354. s.Security = au.GetSecurityValue()
  1355. }
  1356. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1357. return &s
  1358. }
  1359. func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
  1360. func (t *http2Client) IncrMsgSent() {
  1361. atomic.AddInt64(&t.czData.msgSent, 1)
  1362. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1363. }
  1364. func (t *http2Client) IncrMsgRecv() {
  1365. atomic.AddInt64(&t.czData.msgRecv, 1)
  1366. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1367. }
  1368. func (t *http2Client) getOutFlowWindow() int64 {
  1369. resp := make(chan uint32, 1)
  1370. timer := time.NewTimer(time.Second)
  1371. defer timer.Stop()
  1372. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1373. select {
  1374. case sz := <-resp:
  1375. return int64(sz)
  1376. case <-t.ctxDone:
  1377. return -1
  1378. case <-timer.C:
  1379. return -2
  1380. }
  1381. }