clientconn.go 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "math"
  24. "net"
  25. "reflect"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "google.golang.org/grpc/balancer"
  31. "google.golang.org/grpc/balancer/base"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/connectivity"
  34. "google.golang.org/grpc/credentials"
  35. "google.golang.org/grpc/internal/backoff"
  36. "google.golang.org/grpc/internal/channelz"
  37. "google.golang.org/grpc/internal/grpcsync"
  38. "google.golang.org/grpc/internal/grpcutil"
  39. "google.golang.org/grpc/internal/transport"
  40. "google.golang.org/grpc/keepalive"
  41. "google.golang.org/grpc/resolver"
  42. "google.golang.org/grpc/serviceconfig"
  43. "google.golang.org/grpc/status"
  44. _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
  45. _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
  46. _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
  47. )
  48. const (
  49. // minimum time to give a connection to complete
  50. minConnectTimeout = 20 * time.Second
  51. // must match grpclbName in grpclb/grpclb.go
  52. grpclbName = "grpclb"
  53. )
  54. var (
  55. // ErrClientConnClosing indicates that the operation is illegal because
  56. // the ClientConn is closing.
  57. //
  58. // Deprecated: this error should not be relied upon by users; use the status
  59. // code of Canceled instead.
  60. ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
  61. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  62. errConnDrain = errors.New("grpc: the connection is drained")
  63. // errConnClosing indicates that the connection is closing.
  64. errConnClosing = errors.New("grpc: the connection is closing")
  65. // errBalancerClosed indicates that the balancer is closed.
  66. errBalancerClosed = errors.New("grpc: balancer is closed")
  67. // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
  68. // service config.
  69. invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
  70. )
  71. // The following errors are returned from Dial and DialContext
  72. var (
  73. // errNoTransportSecurity indicates that there is no transport security
  74. // being set for ClientConn. Users should either set one or explicitly
  75. // call WithInsecure DialOption to disable security.
  76. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  77. // errTransportCredsAndBundle indicates that creds bundle is used together
  78. // with other individual Transport Credentials.
  79. errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
  80. // errTransportCredentialsMissing indicates that users want to transmit security
  81. // information (e.g., OAuth2 token) which requires secure connection on an insecure
  82. // connection.
  83. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  84. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  85. // and grpc.WithInsecure() are both called for a connection.
  86. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  87. )
  88. const (
  89. defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
  90. defaultClientMaxSendMessageSize = math.MaxInt32
  91. // http2IOBufSize specifies the buffer size for sending frames.
  92. defaultWriteBufSize = 32 * 1024
  93. defaultReadBufSize = 32 * 1024
  94. )
  95. // Dial creates a client connection to the given target.
  96. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  97. return DialContext(context.Background(), target, opts...)
  98. }
  99. // DialContext creates a client connection to the given target. By default, it's
  100. // a non-blocking dial (the function won't wait for connections to be
  101. // established, and connecting happens in the background). To make it a blocking
  102. // dial, use WithBlock() dial option.
  103. //
  104. // In the non-blocking case, the ctx does not act against the connection. It
  105. // only controls the setup steps.
  106. //
  107. // In the blocking case, ctx can be used to cancel or expire the pending
  108. // connection. Once this function returns, the cancellation and expiration of
  109. // ctx will be noop. Users should call ClientConn.Close to terminate all the
  110. // pending operations after this function returns.
  111. //
  112. // The target name syntax is defined in
  113. // https://github.com/grpc/grpc/blob/master/doc/naming.md.
  114. // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
  115. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  116. cc := &ClientConn{
  117. target: target,
  118. csMgr: &connectivityStateManager{},
  119. conns: make(map[*addrConn]struct{}),
  120. dopts: defaultDialOptions(),
  121. blockingpicker: newPickerWrapper(),
  122. czData: new(channelzData),
  123. firstResolveEvent: grpcsync.NewEvent(),
  124. }
  125. cc.retryThrottler.Store((*retryThrottler)(nil))
  126. cc.ctx, cc.cancel = context.WithCancel(context.Background())
  127. for _, opt := range opts {
  128. opt.apply(&cc.dopts)
  129. }
  130. chainUnaryClientInterceptors(cc)
  131. chainStreamClientInterceptors(cc)
  132. defer func() {
  133. if err != nil {
  134. cc.Close()
  135. }
  136. }()
  137. if channelz.IsOn() {
  138. if cc.dopts.channelzParentID != 0 {
  139. cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
  140. channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{
  141. Desc: "Channel Created",
  142. Severity: channelz.CtINFO,
  143. Parent: &channelz.TraceEventDesc{
  144. Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
  145. Severity: channelz.CtINFO,
  146. },
  147. })
  148. } else {
  149. cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
  150. channelz.Info(cc.channelzID, "Channel Created")
  151. }
  152. cc.csMgr.channelzID = cc.channelzID
  153. }
  154. if !cc.dopts.insecure {
  155. if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
  156. return nil, errNoTransportSecurity
  157. }
  158. if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
  159. return nil, errTransportCredsAndBundle
  160. }
  161. } else {
  162. if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
  163. return nil, errCredentialsConflict
  164. }
  165. for _, cd := range cc.dopts.copts.PerRPCCredentials {
  166. if cd.RequireTransportSecurity() {
  167. return nil, errTransportCredentialsMissing
  168. }
  169. }
  170. }
  171. if cc.dopts.defaultServiceConfigRawJSON != nil {
  172. scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
  173. if scpr.Err != nil {
  174. return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
  175. }
  176. cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
  177. }
  178. cc.mkp = cc.dopts.copts.KeepaliveParams
  179. if cc.dopts.copts.Dialer == nil {
  180. cc.dopts.copts.Dialer = newProxyDialer(
  181. func(ctx context.Context, addr string) (net.Conn, error) {
  182. network, addr := parseDialTarget(addr)
  183. return (&net.Dialer{}).DialContext(ctx, network, addr)
  184. },
  185. )
  186. }
  187. if cc.dopts.copts.UserAgent != "" {
  188. cc.dopts.copts.UserAgent += " " + grpcUA
  189. } else {
  190. cc.dopts.copts.UserAgent = grpcUA
  191. }
  192. if cc.dopts.timeout > 0 {
  193. var cancel context.CancelFunc
  194. ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
  195. defer cancel()
  196. }
  197. defer func() {
  198. select {
  199. case <-ctx.Done():
  200. conn, err = nil, ctx.Err()
  201. default:
  202. }
  203. }()
  204. scSet := false
  205. if cc.dopts.scChan != nil {
  206. // Try to get an initial service config.
  207. select {
  208. case sc, ok := <-cc.dopts.scChan:
  209. if ok {
  210. cc.sc = &sc
  211. scSet = true
  212. }
  213. default:
  214. }
  215. }
  216. if cc.dopts.bs == nil {
  217. cc.dopts.bs = backoff.DefaultExponential
  218. }
  219. // Determine the resolver to use.
  220. cc.parsedTarget = grpcutil.ParseTarget(cc.target)
  221. channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
  222. resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
  223. if resolverBuilder == nil {
  224. // If resolver builder is still nil, the parsed target's scheme is
  225. // not registered. Fallback to default resolver and set Endpoint to
  226. // the original target.
  227. channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
  228. cc.parsedTarget = resolver.Target{
  229. Scheme: resolver.GetDefaultScheme(),
  230. Endpoint: target,
  231. }
  232. resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
  233. if resolverBuilder == nil {
  234. return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
  235. }
  236. }
  237. creds := cc.dopts.copts.TransportCredentials
  238. if creds != nil && creds.Info().ServerName != "" {
  239. cc.authority = creds.Info().ServerName
  240. } else if cc.dopts.insecure && cc.dopts.authority != "" {
  241. cc.authority = cc.dopts.authority
  242. } else {
  243. // Use endpoint from "scheme://authority/endpoint" as the default
  244. // authority for ClientConn.
  245. cc.authority = cc.parsedTarget.Endpoint
  246. }
  247. if cc.dopts.scChan != nil && !scSet {
  248. // Blocking wait for the initial service config.
  249. select {
  250. case sc, ok := <-cc.dopts.scChan:
  251. if ok {
  252. cc.sc = &sc
  253. }
  254. case <-ctx.Done():
  255. return nil, ctx.Err()
  256. }
  257. }
  258. if cc.dopts.scChan != nil {
  259. go cc.scWatcher()
  260. }
  261. var credsClone credentials.TransportCredentials
  262. if creds := cc.dopts.copts.TransportCredentials; creds != nil {
  263. credsClone = creds.Clone()
  264. }
  265. cc.balancerBuildOpts = balancer.BuildOptions{
  266. DialCreds: credsClone,
  267. CredsBundle: cc.dopts.copts.CredsBundle,
  268. Dialer: cc.dopts.copts.Dialer,
  269. ChannelzParentID: cc.channelzID,
  270. Target: cc.parsedTarget,
  271. }
  272. // Build the resolver.
  273. rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
  274. if err != nil {
  275. return nil, fmt.Errorf("failed to build resolver: %v", err)
  276. }
  277. cc.mu.Lock()
  278. cc.resolverWrapper = rWrapper
  279. cc.mu.Unlock()
  280. // A blocking dial blocks until the clientConn is ready.
  281. if cc.dopts.block {
  282. for {
  283. s := cc.GetState()
  284. if s == connectivity.Ready {
  285. break
  286. } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
  287. if err = cc.blockingpicker.connectionError(); err != nil {
  288. terr, ok := err.(interface {
  289. Temporary() bool
  290. })
  291. if ok && !terr.Temporary() {
  292. return nil, err
  293. }
  294. }
  295. }
  296. if !cc.WaitForStateChange(ctx, s) {
  297. // ctx got timeout or canceled.
  298. return nil, ctx.Err()
  299. }
  300. }
  301. }
  302. return cc, nil
  303. }
  304. // chainUnaryClientInterceptors chains all unary client interceptors into one.
  305. func chainUnaryClientInterceptors(cc *ClientConn) {
  306. interceptors := cc.dopts.chainUnaryInts
  307. // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
  308. // be executed before any other chained interceptors.
  309. if cc.dopts.unaryInt != nil {
  310. interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
  311. }
  312. var chainedInt UnaryClientInterceptor
  313. if len(interceptors) == 0 {
  314. chainedInt = nil
  315. } else if len(interceptors) == 1 {
  316. chainedInt = interceptors[0]
  317. } else {
  318. chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
  319. return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
  320. }
  321. }
  322. cc.dopts.unaryInt = chainedInt
  323. }
  324. // getChainUnaryInvoker recursively generate the chained unary invoker.
  325. func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
  326. if curr == len(interceptors)-1 {
  327. return finalInvoker
  328. }
  329. return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
  330. return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
  331. }
  332. }
  333. // chainStreamClientInterceptors chains all stream client interceptors into one.
  334. func chainStreamClientInterceptors(cc *ClientConn) {
  335. interceptors := cc.dopts.chainStreamInts
  336. // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
  337. // be executed before any other chained interceptors.
  338. if cc.dopts.streamInt != nil {
  339. interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
  340. }
  341. var chainedInt StreamClientInterceptor
  342. if len(interceptors) == 0 {
  343. chainedInt = nil
  344. } else if len(interceptors) == 1 {
  345. chainedInt = interceptors[0]
  346. } else {
  347. chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
  348. return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
  349. }
  350. }
  351. cc.dopts.streamInt = chainedInt
  352. }
  353. // getChainStreamer recursively generate the chained client stream constructor.
  354. func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
  355. if curr == len(interceptors)-1 {
  356. return finalStreamer
  357. }
  358. return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
  359. return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
  360. }
  361. }
  362. // connectivityStateManager keeps the connectivity.State of ClientConn.
  363. // This struct will eventually be exported so the balancers can access it.
  364. type connectivityStateManager struct {
  365. mu sync.Mutex
  366. state connectivity.State
  367. notifyChan chan struct{}
  368. channelzID int64
  369. }
  370. // updateState updates the connectivity.State of ClientConn.
  371. // If there's a change it notifies goroutines waiting on state change to
  372. // happen.
  373. func (csm *connectivityStateManager) updateState(state connectivity.State) {
  374. csm.mu.Lock()
  375. defer csm.mu.Unlock()
  376. if csm.state == connectivity.Shutdown {
  377. return
  378. }
  379. if csm.state == state {
  380. return
  381. }
  382. csm.state = state
  383. channelz.Infof(csm.channelzID, "Channel Connectivity change to %v", state)
  384. if csm.notifyChan != nil {
  385. // There are other goroutines waiting on this channel.
  386. close(csm.notifyChan)
  387. csm.notifyChan = nil
  388. }
  389. }
  390. func (csm *connectivityStateManager) getState() connectivity.State {
  391. csm.mu.Lock()
  392. defer csm.mu.Unlock()
  393. return csm.state
  394. }
  395. func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
  396. csm.mu.Lock()
  397. defer csm.mu.Unlock()
  398. if csm.notifyChan == nil {
  399. csm.notifyChan = make(chan struct{})
  400. }
  401. return csm.notifyChan
  402. }
  403. // ClientConnInterface defines the functions clients need to perform unary and
  404. // streaming RPCs. It is implemented by *ClientConn, and is only intended to
  405. // be referenced by generated code.
  406. type ClientConnInterface interface {
  407. // Invoke performs a unary RPC and returns after the response is received
  408. // into reply.
  409. Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
  410. // NewStream begins a streaming RPC.
  411. NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
  412. }
  413. // Assert *ClientConn implements ClientConnInterface.
  414. var _ ClientConnInterface = (*ClientConn)(nil)
  415. // ClientConn represents a virtual connection to a conceptual endpoint, to
  416. // perform RPCs.
  417. //
  418. // A ClientConn is free to have zero or more actual connections to the endpoint
  419. // based on configuration, load, etc. It is also free to determine which actual
  420. // endpoints to use and may change it every RPC, permitting client-side load
  421. // balancing.
  422. //
  423. // A ClientConn encapsulates a range of functionality including name
  424. // resolution, TCP connection establishment (with retries and backoff) and TLS
  425. // handshakes. It also handles errors on established connections by
  426. // re-resolving the name and reconnecting.
  427. type ClientConn struct {
  428. ctx context.Context
  429. cancel context.CancelFunc
  430. target string
  431. parsedTarget resolver.Target
  432. authority string
  433. dopts dialOptions
  434. csMgr *connectivityStateManager
  435. balancerBuildOpts balancer.BuildOptions
  436. blockingpicker *pickerWrapper
  437. mu sync.RWMutex
  438. resolverWrapper *ccResolverWrapper
  439. sc *ServiceConfig
  440. conns map[*addrConn]struct{}
  441. // Keepalive parameter can be updated if a GoAway is received.
  442. mkp keepalive.ClientParameters
  443. curBalancerName string
  444. balancerWrapper *ccBalancerWrapper
  445. retryThrottler atomic.Value
  446. firstResolveEvent *grpcsync.Event
  447. channelzID int64 // channelz unique identification number
  448. czData *channelzData
  449. }
  450. // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
  451. // ctx expires. A true value is returned in former case and false in latter.
  452. // This is an EXPERIMENTAL API.
  453. func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
  454. ch := cc.csMgr.getNotifyChan()
  455. if cc.csMgr.getState() != sourceState {
  456. return true
  457. }
  458. select {
  459. case <-ctx.Done():
  460. return false
  461. case <-ch:
  462. return true
  463. }
  464. }
  465. // GetState returns the connectivity.State of ClientConn.
  466. // This is an EXPERIMENTAL API.
  467. func (cc *ClientConn) GetState() connectivity.State {
  468. return cc.csMgr.getState()
  469. }
  470. func (cc *ClientConn) scWatcher() {
  471. for {
  472. select {
  473. case sc, ok := <-cc.dopts.scChan:
  474. if !ok {
  475. return
  476. }
  477. cc.mu.Lock()
  478. // TODO: load balance policy runtime change is ignored.
  479. // We may revisit this decision in the future.
  480. cc.sc = &sc
  481. cc.mu.Unlock()
  482. case <-cc.ctx.Done():
  483. return
  484. }
  485. }
  486. }
  487. // waitForResolvedAddrs blocks until the resolver has provided addresses or the
  488. // context expires. Returns nil unless the context expires first; otherwise
  489. // returns a status error based on the context.
  490. func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
  491. // This is on the RPC path, so we use a fast path to avoid the
  492. // more-expensive "select" below after the resolver has returned once.
  493. if cc.firstResolveEvent.HasFired() {
  494. return nil
  495. }
  496. select {
  497. case <-cc.firstResolveEvent.Done():
  498. return nil
  499. case <-ctx.Done():
  500. return status.FromContextError(ctx.Err()).Err()
  501. case <-cc.ctx.Done():
  502. return ErrClientConnClosing
  503. }
  504. }
  505. var emptyServiceConfig *ServiceConfig
  506. func init() {
  507. cfg := parseServiceConfig("{}")
  508. if cfg.Err != nil {
  509. panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
  510. }
  511. emptyServiceConfig = cfg.Config.(*ServiceConfig)
  512. }
  513. func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
  514. if cc.sc != nil {
  515. cc.applyServiceConfigAndBalancer(cc.sc, addrs)
  516. return
  517. }
  518. if cc.dopts.defaultServiceConfig != nil {
  519. cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
  520. } else {
  521. cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
  522. }
  523. }
  524. func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
  525. defer cc.firstResolveEvent.Fire()
  526. cc.mu.Lock()
  527. // Check if the ClientConn is already closed. Some fields (e.g.
  528. // balancerWrapper) are set to nil when closing the ClientConn, and could
  529. // cause nil pointer panic if we don't have this check.
  530. if cc.conns == nil {
  531. cc.mu.Unlock()
  532. return nil
  533. }
  534. if err != nil {
  535. // May need to apply the initial service config in case the resolver
  536. // doesn't support service configs, or doesn't provide a service config
  537. // with the new addresses.
  538. cc.maybeApplyDefaultServiceConfig(nil)
  539. if cc.balancerWrapper != nil {
  540. cc.balancerWrapper.resolverError(err)
  541. }
  542. // No addresses are valid with err set; return early.
  543. cc.mu.Unlock()
  544. return balancer.ErrBadResolverState
  545. }
  546. var ret error
  547. if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
  548. cc.maybeApplyDefaultServiceConfig(s.Addresses)
  549. // TODO: do we need to apply a failing LB policy if there is no
  550. // default, per the error handling design?
  551. } else {
  552. if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
  553. cc.applyServiceConfigAndBalancer(sc, s.Addresses)
  554. } else {
  555. ret = balancer.ErrBadResolverState
  556. if cc.balancerWrapper == nil {
  557. var err error
  558. if s.ServiceConfig.Err != nil {
  559. err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
  560. } else {
  561. err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
  562. }
  563. cc.blockingpicker.updatePicker(base.NewErrPicker(err))
  564. cc.csMgr.updateState(connectivity.TransientFailure)
  565. cc.mu.Unlock()
  566. return ret
  567. }
  568. }
  569. }
  570. var balCfg serviceconfig.LoadBalancingConfig
  571. if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
  572. balCfg = cc.sc.lbConfig.cfg
  573. }
  574. cbn := cc.curBalancerName
  575. bw := cc.balancerWrapper
  576. cc.mu.Unlock()
  577. if cbn != grpclbName {
  578. // Filter any grpclb addresses since we don't have the grpclb balancer.
  579. for i := 0; i < len(s.Addresses); {
  580. if s.Addresses[i].Type == resolver.GRPCLB {
  581. copy(s.Addresses[i:], s.Addresses[i+1:])
  582. s.Addresses = s.Addresses[:len(s.Addresses)-1]
  583. continue
  584. }
  585. i++
  586. }
  587. }
  588. uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
  589. if ret == nil {
  590. ret = uccsErr // prefer ErrBadResolver state since any other error is
  591. // currently meaningless to the caller.
  592. }
  593. return ret
  594. }
  595. // switchBalancer starts the switching from current balancer to the balancer
  596. // with the given name.
  597. //
  598. // It will NOT send the current address list to the new balancer. If needed,
  599. // caller of this function should send address list to the new balancer after
  600. // this function returns.
  601. //
  602. // Caller must hold cc.mu.
  603. func (cc *ClientConn) switchBalancer(name string) {
  604. if strings.EqualFold(cc.curBalancerName, name) {
  605. return
  606. }
  607. channelz.Infof(cc.channelzID, "ClientConn switching balancer to %q", name)
  608. if cc.dopts.balancerBuilder != nil {
  609. channelz.Info(cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
  610. return
  611. }
  612. if cc.balancerWrapper != nil {
  613. cc.balancerWrapper.close()
  614. }
  615. builder := balancer.Get(name)
  616. if builder == nil {
  617. channelz.Warningf(cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
  618. channelz.Infof(cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
  619. builder = newPickfirstBuilder()
  620. } else {
  621. channelz.Infof(cc.channelzID, "Channel switches to new LB policy %q", name)
  622. }
  623. cc.curBalancerName = builder.Name()
  624. cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
  625. }
  626. func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
  627. cc.mu.Lock()
  628. if cc.conns == nil {
  629. cc.mu.Unlock()
  630. return
  631. }
  632. // TODO(bar switching) send updates to all balancer wrappers when balancer
  633. // gracefully switching is supported.
  634. cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
  635. cc.mu.Unlock()
  636. }
  637. // newAddrConn creates an addrConn for addrs and adds it to cc.conns.
  638. //
  639. // Caller needs to make sure len(addrs) > 0.
  640. func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
  641. ac := &addrConn{
  642. state: connectivity.Idle,
  643. cc: cc,
  644. addrs: addrs,
  645. scopts: opts,
  646. dopts: cc.dopts,
  647. czData: new(channelzData),
  648. resetBackoff: make(chan struct{}),
  649. }
  650. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  651. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  652. cc.mu.Lock()
  653. if cc.conns == nil {
  654. cc.mu.Unlock()
  655. return nil, ErrClientConnClosing
  656. }
  657. if channelz.IsOn() {
  658. ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
  659. channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
  660. Desc: "Subchannel Created",
  661. Severity: channelz.CtINFO,
  662. Parent: &channelz.TraceEventDesc{
  663. Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
  664. Severity: channelz.CtINFO,
  665. },
  666. })
  667. }
  668. cc.conns[ac] = struct{}{}
  669. cc.mu.Unlock()
  670. return ac, nil
  671. }
  672. // removeAddrConn removes the addrConn in the subConn from clientConn.
  673. // It also tears down the ac with the given error.
  674. func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
  675. cc.mu.Lock()
  676. if cc.conns == nil {
  677. cc.mu.Unlock()
  678. return
  679. }
  680. delete(cc.conns, ac)
  681. cc.mu.Unlock()
  682. ac.tearDown(err)
  683. }
  684. func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
  685. return &channelz.ChannelInternalMetric{
  686. State: cc.GetState(),
  687. Target: cc.target,
  688. CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
  689. CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
  690. CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
  691. LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
  692. }
  693. }
  694. // Target returns the target string of the ClientConn.
  695. // This is an EXPERIMENTAL API.
  696. func (cc *ClientConn) Target() string {
  697. return cc.target
  698. }
  699. func (cc *ClientConn) incrCallsStarted() {
  700. atomic.AddInt64(&cc.czData.callsStarted, 1)
  701. atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
  702. }
  703. func (cc *ClientConn) incrCallsSucceeded() {
  704. atomic.AddInt64(&cc.czData.callsSucceeded, 1)
  705. }
  706. func (cc *ClientConn) incrCallsFailed() {
  707. atomic.AddInt64(&cc.czData.callsFailed, 1)
  708. }
  709. // connect starts creating a transport.
  710. // It does nothing if the ac is not IDLE.
  711. // TODO(bar) Move this to the addrConn section.
  712. func (ac *addrConn) connect() error {
  713. ac.mu.Lock()
  714. if ac.state == connectivity.Shutdown {
  715. ac.mu.Unlock()
  716. return errConnClosing
  717. }
  718. if ac.state != connectivity.Idle {
  719. ac.mu.Unlock()
  720. return nil
  721. }
  722. // Update connectivity state within the lock to prevent subsequent or
  723. // concurrent calls from resetting the transport more than once.
  724. ac.updateConnectivityState(connectivity.Connecting, nil)
  725. ac.mu.Unlock()
  726. // Start a goroutine connecting to the server asynchronously.
  727. go ac.resetTransport()
  728. return nil
  729. }
  730. // tryUpdateAddrs tries to update ac.addrs with the new addresses list.
  731. //
  732. // If ac is Connecting, it returns false. The caller should tear down the ac and
  733. // create a new one. Note that the backoff will be reset when this happens.
  734. //
  735. // If ac is TransientFailure, it updates ac.addrs and returns true. The updated
  736. // addresses will be picked up by retry in the next iteration after backoff.
  737. //
  738. // If ac is Shutdown or Idle, it updates ac.addrs and returns true.
  739. //
  740. // If ac is Ready, it checks whether current connected address of ac is in the
  741. // new addrs list.
  742. // - If true, it updates ac.addrs and returns true. The ac will keep using
  743. // the existing connection.
  744. // - If false, it does nothing and returns false.
  745. func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
  746. ac.mu.Lock()
  747. defer ac.mu.Unlock()
  748. channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
  749. if ac.state == connectivity.Shutdown ||
  750. ac.state == connectivity.TransientFailure ||
  751. ac.state == connectivity.Idle {
  752. ac.addrs = addrs
  753. return true
  754. }
  755. if ac.state == connectivity.Connecting {
  756. return false
  757. }
  758. // ac.state is Ready, try to find the connected address.
  759. var curAddrFound bool
  760. for _, a := range addrs {
  761. if reflect.DeepEqual(ac.curAddr, a) {
  762. curAddrFound = true
  763. break
  764. }
  765. }
  766. channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
  767. if curAddrFound {
  768. ac.addrs = addrs
  769. }
  770. return curAddrFound
  771. }
  772. // GetMethodConfig gets the method config of the input method.
  773. // If there's an exact match for input method (i.e. /service/method), we return
  774. // the corresponding MethodConfig.
  775. // If there isn't an exact match for the input method, we look for the default config
  776. // under the service (i.e /service/). If there is a default MethodConfig for
  777. // the service, we return it.
  778. // Otherwise, we return an empty MethodConfig.
  779. func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
  780. // TODO: Avoid the locking here.
  781. cc.mu.RLock()
  782. defer cc.mu.RUnlock()
  783. if cc.sc == nil {
  784. return MethodConfig{}
  785. }
  786. m, ok := cc.sc.Methods[method]
  787. if !ok {
  788. i := strings.LastIndex(method, "/")
  789. m = cc.sc.Methods[method[:i+1]]
  790. }
  791. return m
  792. }
  793. func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
  794. cc.mu.RLock()
  795. defer cc.mu.RUnlock()
  796. if cc.sc == nil {
  797. return nil
  798. }
  799. return cc.sc.healthCheckConfig
  800. }
  801. func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
  802. t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
  803. Ctx: ctx,
  804. FullMethodName: method,
  805. })
  806. if err != nil {
  807. return nil, nil, toRPCErr(err)
  808. }
  809. return t, done, nil
  810. }
  811. func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
  812. if sc == nil {
  813. // should never reach here.
  814. return
  815. }
  816. cc.sc = sc
  817. if cc.sc.retryThrottling != nil {
  818. newThrottler := &retryThrottler{
  819. tokens: cc.sc.retryThrottling.MaxTokens,
  820. max: cc.sc.retryThrottling.MaxTokens,
  821. thresh: cc.sc.retryThrottling.MaxTokens / 2,
  822. ratio: cc.sc.retryThrottling.TokenRatio,
  823. }
  824. cc.retryThrottler.Store(newThrottler)
  825. } else {
  826. cc.retryThrottler.Store((*retryThrottler)(nil))
  827. }
  828. if cc.dopts.balancerBuilder == nil {
  829. // Only look at balancer types and switch balancer if balancer dial
  830. // option is not set.
  831. var newBalancerName string
  832. if cc.sc != nil && cc.sc.lbConfig != nil {
  833. newBalancerName = cc.sc.lbConfig.name
  834. } else {
  835. var isGRPCLB bool
  836. for _, a := range addrs {
  837. if a.Type == resolver.GRPCLB {
  838. isGRPCLB = true
  839. break
  840. }
  841. }
  842. if isGRPCLB {
  843. newBalancerName = grpclbName
  844. } else if cc.sc != nil && cc.sc.LB != nil {
  845. newBalancerName = *cc.sc.LB
  846. } else {
  847. newBalancerName = PickFirstBalancerName
  848. }
  849. }
  850. cc.switchBalancer(newBalancerName)
  851. } else if cc.balancerWrapper == nil {
  852. // Balancer dial option was set, and this is the first time handling
  853. // resolved addresses. Build a balancer with dopts.balancerBuilder.
  854. cc.curBalancerName = cc.dopts.balancerBuilder.Name()
  855. cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
  856. }
  857. }
  858. func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
  859. cc.mu.RLock()
  860. r := cc.resolverWrapper
  861. cc.mu.RUnlock()
  862. if r == nil {
  863. return
  864. }
  865. go r.resolveNow(o)
  866. }
  867. // ResetConnectBackoff wakes up all subchannels in transient failure and causes
  868. // them to attempt another connection immediately. It also resets the backoff
  869. // times used for subsequent attempts regardless of the current state.
  870. //
  871. // In general, this function should not be used. Typical service or network
  872. // outages result in a reasonable client reconnection strategy by default.
  873. // However, if a previously unavailable network becomes available, this may be
  874. // used to trigger an immediate reconnect.
  875. //
  876. // This API is EXPERIMENTAL.
  877. func (cc *ClientConn) ResetConnectBackoff() {
  878. cc.mu.Lock()
  879. conns := cc.conns
  880. cc.mu.Unlock()
  881. for ac := range conns {
  882. ac.resetConnectBackoff()
  883. }
  884. }
  885. // Close tears down the ClientConn and all underlying connections.
  886. func (cc *ClientConn) Close() error {
  887. defer cc.cancel()
  888. cc.mu.Lock()
  889. if cc.conns == nil {
  890. cc.mu.Unlock()
  891. return ErrClientConnClosing
  892. }
  893. conns := cc.conns
  894. cc.conns = nil
  895. cc.csMgr.updateState(connectivity.Shutdown)
  896. rWrapper := cc.resolverWrapper
  897. cc.resolverWrapper = nil
  898. bWrapper := cc.balancerWrapper
  899. cc.balancerWrapper = nil
  900. cc.mu.Unlock()
  901. cc.blockingpicker.close()
  902. if rWrapper != nil {
  903. rWrapper.close()
  904. }
  905. if bWrapper != nil {
  906. bWrapper.close()
  907. }
  908. for ac := range conns {
  909. ac.tearDown(ErrClientConnClosing)
  910. }
  911. if channelz.IsOn() {
  912. ted := &channelz.TraceEventDesc{
  913. Desc: "Channel Deleted",
  914. Severity: channelz.CtINFO,
  915. }
  916. if cc.dopts.channelzParentID != 0 {
  917. ted.Parent = &channelz.TraceEventDesc{
  918. Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
  919. Severity: channelz.CtINFO,
  920. }
  921. }
  922. channelz.AddTraceEvent(cc.channelzID, 0, ted)
  923. // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
  924. // the entity being deleted, and thus prevent it from being deleted right away.
  925. channelz.RemoveEntry(cc.channelzID)
  926. }
  927. return nil
  928. }
  929. // addrConn is a network connection to a given address.
  930. type addrConn struct {
  931. ctx context.Context
  932. cancel context.CancelFunc
  933. cc *ClientConn
  934. dopts dialOptions
  935. acbw balancer.SubConn
  936. scopts balancer.NewSubConnOptions
  937. // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
  938. // health checking may require server to report healthy to set ac to READY), and is reset
  939. // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
  940. // is received, transport is closed, ac has been torn down).
  941. transport transport.ClientTransport // The current transport.
  942. mu sync.Mutex
  943. curAddr resolver.Address // The current address.
  944. addrs []resolver.Address // All addresses that the resolver resolved to.
  945. // Use updateConnectivityState for updating addrConn's connectivity state.
  946. state connectivity.State
  947. backoffIdx int // Needs to be stateful for resetConnectBackoff.
  948. resetBackoff chan struct{}
  949. channelzID int64 // channelz unique identification number.
  950. czData *channelzData
  951. }
  952. // Note: this requires a lock on ac.mu.
  953. func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
  954. if ac.state == s {
  955. return
  956. }
  957. ac.state = s
  958. channelz.Infof(ac.channelzID, "Subchannel Connectivity change to %v", s)
  959. ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
  960. }
  961. // adjustParams updates parameters used to create transports upon
  962. // receiving a GoAway.
  963. func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
  964. switch r {
  965. case transport.GoAwayTooManyPings:
  966. v := 2 * ac.dopts.copts.KeepaliveParams.Time
  967. ac.cc.mu.Lock()
  968. if v > ac.cc.mkp.Time {
  969. ac.cc.mkp.Time = v
  970. }
  971. ac.cc.mu.Unlock()
  972. }
  973. }
  974. func (ac *addrConn) resetTransport() {
  975. for i := 0; ; i++ {
  976. if i > 0 {
  977. ac.cc.resolveNow(resolver.ResolveNowOptions{})
  978. }
  979. ac.mu.Lock()
  980. if ac.state == connectivity.Shutdown {
  981. ac.mu.Unlock()
  982. return
  983. }
  984. addrs := ac.addrs
  985. backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
  986. // This will be the duration that dial gets to finish.
  987. dialDuration := minConnectTimeout
  988. if ac.dopts.minConnectTimeout != nil {
  989. dialDuration = ac.dopts.minConnectTimeout()
  990. }
  991. if dialDuration < backoffFor {
  992. // Give dial more time as we keep failing to connect.
  993. dialDuration = backoffFor
  994. }
  995. // We can potentially spend all the time trying the first address, and
  996. // if the server accepts the connection and then hangs, the following
  997. // addresses will never be tried.
  998. //
  999. // The spec doesn't mention what should be done for multiple addresses.
  1000. // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
  1001. connectDeadline := time.Now().Add(dialDuration)
  1002. ac.updateConnectivityState(connectivity.Connecting, nil)
  1003. ac.transport = nil
  1004. ac.mu.Unlock()
  1005. newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
  1006. if err != nil {
  1007. // After exhausting all addresses, the addrConn enters
  1008. // TRANSIENT_FAILURE.
  1009. ac.mu.Lock()
  1010. if ac.state == connectivity.Shutdown {
  1011. ac.mu.Unlock()
  1012. return
  1013. }
  1014. ac.updateConnectivityState(connectivity.TransientFailure, err)
  1015. // Backoff.
  1016. b := ac.resetBackoff
  1017. ac.mu.Unlock()
  1018. timer := time.NewTimer(backoffFor)
  1019. select {
  1020. case <-timer.C:
  1021. ac.mu.Lock()
  1022. ac.backoffIdx++
  1023. ac.mu.Unlock()
  1024. case <-b:
  1025. timer.Stop()
  1026. case <-ac.ctx.Done():
  1027. timer.Stop()
  1028. return
  1029. }
  1030. continue
  1031. }
  1032. ac.mu.Lock()
  1033. if ac.state == connectivity.Shutdown {
  1034. ac.mu.Unlock()
  1035. newTr.Close()
  1036. return
  1037. }
  1038. ac.curAddr = addr
  1039. ac.transport = newTr
  1040. ac.backoffIdx = 0
  1041. hctx, hcancel := context.WithCancel(ac.ctx)
  1042. ac.startHealthCheck(hctx)
  1043. ac.mu.Unlock()
  1044. // Block until the created transport is down. And when this happens,
  1045. // we restart from the top of the addr list.
  1046. <-reconnect.Done()
  1047. hcancel()
  1048. // restart connecting - the top of the loop will set state to
  1049. // CONNECTING. This is against the current connectivity semantics doc,
  1050. // however it allows for graceful behavior for RPCs not yet dispatched
  1051. // - unfortunate timing would otherwise lead to the RPC failing even
  1052. // though the TRANSIENT_FAILURE state (called for by the doc) would be
  1053. // instantaneous.
  1054. //
  1055. // Ideally we should transition to Idle here and block until there is
  1056. // RPC activity that leads to the balancer requesting a reconnect of
  1057. // the associated SubConn.
  1058. }
  1059. }
  1060. // tryAllAddrs tries to creates a connection to the addresses, and stop when at the
  1061. // first successful one. It returns the transport, the address and a Event in
  1062. // the successful case. The Event fires when the returned transport disconnects.
  1063. func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
  1064. var firstConnErr error
  1065. for _, addr := range addrs {
  1066. ac.mu.Lock()
  1067. if ac.state == connectivity.Shutdown {
  1068. ac.mu.Unlock()
  1069. return nil, resolver.Address{}, nil, errConnClosing
  1070. }
  1071. ac.cc.mu.RLock()
  1072. ac.dopts.copts.KeepaliveParams = ac.cc.mkp
  1073. ac.cc.mu.RUnlock()
  1074. copts := ac.dopts.copts
  1075. if ac.scopts.CredsBundle != nil {
  1076. copts.CredsBundle = ac.scopts.CredsBundle
  1077. }
  1078. ac.mu.Unlock()
  1079. channelz.Infof(ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
  1080. newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
  1081. if err == nil {
  1082. return newTr, addr, reconnect, nil
  1083. }
  1084. if firstConnErr == nil {
  1085. firstConnErr = err
  1086. }
  1087. ac.cc.blockingpicker.updateConnectionError(err)
  1088. }
  1089. // Couldn't connect to any address.
  1090. return nil, resolver.Address{}, nil, firstConnErr
  1091. }
  1092. // createTransport creates a connection to addr. It returns the transport and a
  1093. // Event in the successful case. The Event fires when the returned transport
  1094. // disconnects.
  1095. func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
  1096. prefaceReceived := make(chan struct{})
  1097. onCloseCalled := make(chan struct{})
  1098. reconnect := grpcsync.NewEvent()
  1099. authority := ac.cc.authority
  1100. // addr.ServerName takes precedent over ClientConn authority, if present.
  1101. if addr.ServerName != "" {
  1102. authority = addr.ServerName
  1103. }
  1104. target := transport.TargetInfo{
  1105. Addr: addr.Addr,
  1106. Metadata: addr.Metadata,
  1107. Authority: authority,
  1108. }
  1109. once := sync.Once{}
  1110. onGoAway := func(r transport.GoAwayReason) {
  1111. ac.mu.Lock()
  1112. ac.adjustParams(r)
  1113. once.Do(func() {
  1114. if ac.state == connectivity.Ready {
  1115. // Prevent this SubConn from being used for new RPCs by setting its
  1116. // state to Connecting.
  1117. //
  1118. // TODO: this should be Idle when grpc-go properly supports it.
  1119. ac.updateConnectivityState(connectivity.Connecting, nil)
  1120. }
  1121. })
  1122. ac.mu.Unlock()
  1123. reconnect.Fire()
  1124. }
  1125. onClose := func() {
  1126. ac.mu.Lock()
  1127. once.Do(func() {
  1128. if ac.state == connectivity.Ready {
  1129. // Prevent this SubConn from being used for new RPCs by setting its
  1130. // state to Connecting.
  1131. //
  1132. // TODO: this should be Idle when grpc-go properly supports it.
  1133. ac.updateConnectivityState(connectivity.Connecting, nil)
  1134. }
  1135. })
  1136. ac.mu.Unlock()
  1137. close(onCloseCalled)
  1138. reconnect.Fire()
  1139. }
  1140. onPrefaceReceipt := func() {
  1141. close(prefaceReceived)
  1142. }
  1143. connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
  1144. defer cancel()
  1145. if channelz.IsOn() {
  1146. copts.ChannelzParentID = ac.channelzID
  1147. }
  1148. newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
  1149. if err != nil {
  1150. // newTr is either nil, or closed.
  1151. channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
  1152. return nil, nil, err
  1153. }
  1154. select {
  1155. case <-time.After(time.Until(connectDeadline)):
  1156. // We didn't get the preface in time.
  1157. newTr.Close()
  1158. channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
  1159. return nil, nil, errors.New("timed out waiting for server handshake")
  1160. case <-prefaceReceived:
  1161. // We got the preface - huzzah! things are good.
  1162. case <-onCloseCalled:
  1163. // The transport has already closed - noop.
  1164. return nil, nil, errors.New("connection closed")
  1165. // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
  1166. }
  1167. return newTr, reconnect, nil
  1168. }
  1169. // startHealthCheck starts the health checking stream (RPC) to watch the health
  1170. // stats of this connection if health checking is requested and configured.
  1171. //
  1172. // LB channel health checking is enabled when all requirements below are met:
  1173. // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
  1174. // 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
  1175. // 3. a service config with non-empty healthCheckConfig field is provided
  1176. // 4. the load balancer requests it
  1177. //
  1178. // It sets addrConn to READY if the health checking stream is not started.
  1179. //
  1180. // Caller must hold ac.mu.
  1181. func (ac *addrConn) startHealthCheck(ctx context.Context) {
  1182. var healthcheckManagingState bool
  1183. defer func() {
  1184. if !healthcheckManagingState {
  1185. ac.updateConnectivityState(connectivity.Ready, nil)
  1186. }
  1187. }()
  1188. if ac.cc.dopts.disableHealthCheck {
  1189. return
  1190. }
  1191. healthCheckConfig := ac.cc.healthCheckConfig()
  1192. if healthCheckConfig == nil {
  1193. return
  1194. }
  1195. if !ac.scopts.HealthCheckEnabled {
  1196. return
  1197. }
  1198. healthCheckFunc := ac.cc.dopts.healthCheckFunc
  1199. if healthCheckFunc == nil {
  1200. // The health package is not imported to set health check function.
  1201. //
  1202. // TODO: add a link to the health check doc in the error message.
  1203. channelz.Error(ac.channelzID, "Health check is requested but health check function is not set.")
  1204. return
  1205. }
  1206. healthcheckManagingState = true
  1207. // Set up the health check helper functions.
  1208. currentTr := ac.transport
  1209. newStream := func(method string) (interface{}, error) {
  1210. ac.mu.Lock()
  1211. if ac.transport != currentTr {
  1212. ac.mu.Unlock()
  1213. return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
  1214. }
  1215. ac.mu.Unlock()
  1216. return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
  1217. }
  1218. setConnectivityState := func(s connectivity.State, lastErr error) {
  1219. ac.mu.Lock()
  1220. defer ac.mu.Unlock()
  1221. if ac.transport != currentTr {
  1222. return
  1223. }
  1224. ac.updateConnectivityState(s, lastErr)
  1225. }
  1226. // Start the health checking stream.
  1227. go func() {
  1228. err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
  1229. if err != nil {
  1230. if status.Code(err) == codes.Unimplemented {
  1231. channelz.Error(ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
  1232. } else {
  1233. channelz.Errorf(ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
  1234. }
  1235. }
  1236. }()
  1237. }
  1238. func (ac *addrConn) resetConnectBackoff() {
  1239. ac.mu.Lock()
  1240. close(ac.resetBackoff)
  1241. ac.backoffIdx = 0
  1242. ac.resetBackoff = make(chan struct{})
  1243. ac.mu.Unlock()
  1244. }
  1245. // getReadyTransport returns the transport if ac's state is READY.
  1246. // Otherwise it returns nil, false.
  1247. // If ac's state is IDLE, it will trigger ac to connect.
  1248. func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
  1249. ac.mu.Lock()
  1250. if ac.state == connectivity.Ready && ac.transport != nil {
  1251. t := ac.transport
  1252. ac.mu.Unlock()
  1253. return t, true
  1254. }
  1255. var idle bool
  1256. if ac.state == connectivity.Idle {
  1257. idle = true
  1258. }
  1259. ac.mu.Unlock()
  1260. // Trigger idle ac to connect.
  1261. if idle {
  1262. ac.connect()
  1263. }
  1264. return nil, false
  1265. }
  1266. // tearDown starts to tear down the addrConn.
  1267. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  1268. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  1269. // tight loop.
  1270. // tearDown doesn't remove ac from ac.cc.conns.
  1271. func (ac *addrConn) tearDown(err error) {
  1272. ac.mu.Lock()
  1273. if ac.state == connectivity.Shutdown {
  1274. ac.mu.Unlock()
  1275. return
  1276. }
  1277. curTr := ac.transport
  1278. ac.transport = nil
  1279. // We have to set the state to Shutdown before anything else to prevent races
  1280. // between setting the state and logic that waits on context cancellation / etc.
  1281. ac.updateConnectivityState(connectivity.Shutdown, nil)
  1282. ac.cancel()
  1283. ac.curAddr = resolver.Address{}
  1284. if err == errConnDrain && curTr != nil {
  1285. // GracefulClose(...) may be executed multiple times when
  1286. // i) receiving multiple GoAway frames from the server; or
  1287. // ii) there are concurrent name resolver/Balancer triggered
  1288. // address removal and GoAway.
  1289. // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
  1290. ac.mu.Unlock()
  1291. curTr.GracefulClose()
  1292. ac.mu.Lock()
  1293. }
  1294. if channelz.IsOn() {
  1295. channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
  1296. Desc: "Subchannel Deleted",
  1297. Severity: channelz.CtINFO,
  1298. Parent: &channelz.TraceEventDesc{
  1299. Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
  1300. Severity: channelz.CtINFO,
  1301. },
  1302. })
  1303. // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
  1304. // the entity being deleted, and thus prevent it from being deleted right away.
  1305. channelz.RemoveEntry(ac.channelzID)
  1306. }
  1307. ac.mu.Unlock()
  1308. }
  1309. func (ac *addrConn) getState() connectivity.State {
  1310. ac.mu.Lock()
  1311. defer ac.mu.Unlock()
  1312. return ac.state
  1313. }
  1314. func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
  1315. ac.mu.Lock()
  1316. addr := ac.curAddr.Addr
  1317. ac.mu.Unlock()
  1318. return &channelz.ChannelInternalMetric{
  1319. State: ac.getState(),
  1320. Target: addr,
  1321. CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
  1322. CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
  1323. CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
  1324. LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
  1325. }
  1326. }
  1327. func (ac *addrConn) incrCallsStarted() {
  1328. atomic.AddInt64(&ac.czData.callsStarted, 1)
  1329. atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
  1330. }
  1331. func (ac *addrConn) incrCallsSucceeded() {
  1332. atomic.AddInt64(&ac.czData.callsSucceeded, 1)
  1333. }
  1334. func (ac *addrConn) incrCallsFailed() {
  1335. atomic.AddInt64(&ac.czData.callsFailed, 1)
  1336. }
  1337. type retryThrottler struct {
  1338. max float64
  1339. thresh float64
  1340. ratio float64
  1341. mu sync.Mutex
  1342. tokens float64 // TODO(dfawley): replace with atomic and remove lock.
  1343. }
  1344. // throttle subtracts a retry token from the pool and returns whether a retry
  1345. // should be throttled (disallowed) based upon the retry throttling policy in
  1346. // the service config.
  1347. func (rt *retryThrottler) throttle() bool {
  1348. if rt == nil {
  1349. return false
  1350. }
  1351. rt.mu.Lock()
  1352. defer rt.mu.Unlock()
  1353. rt.tokens--
  1354. if rt.tokens < 0 {
  1355. rt.tokens = 0
  1356. }
  1357. return rt.tokens <= rt.thresh
  1358. }
  1359. func (rt *retryThrottler) successfulRPC() {
  1360. if rt == nil {
  1361. return
  1362. }
  1363. rt.mu.Lock()
  1364. defer rt.mu.Unlock()
  1365. rt.tokens += rt.ratio
  1366. if rt.tokens > rt.max {
  1367. rt.tokens = rt.max
  1368. }
  1369. }
  1370. type channelzChannel struct {
  1371. cc *ClientConn
  1372. }
  1373. func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
  1374. return c.cc.channelzMetric()
  1375. }
  1376. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  1377. // underlying connections within the specified timeout.
  1378. //
  1379. // Deprecated: This error is never returned by grpc and should not be
  1380. // referenced by users.
  1381. var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
  1382. func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
  1383. for _, rb := range cc.dopts.resolvers {
  1384. if cc.parsedTarget.Scheme == rb.Scheme() {
  1385. return rb
  1386. }
  1387. }
  1388. return resolver.Get(cc.parsedTarget.Scheme)
  1389. }