| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533 | /* * * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */package grpcimport (	"context"	"errors"	"fmt"	"math"	"net"	"reflect"	"strings"	"sync"	"sync/atomic"	"time"	"google.golang.org/grpc/balancer"	"google.golang.org/grpc/balancer/base"	"google.golang.org/grpc/codes"	"google.golang.org/grpc/connectivity"	"google.golang.org/grpc/credentials"	"google.golang.org/grpc/internal/backoff"	"google.golang.org/grpc/internal/channelz"	"google.golang.org/grpc/internal/grpcsync"	"google.golang.org/grpc/internal/grpcutil"	"google.golang.org/grpc/internal/transport"	"google.golang.org/grpc/keepalive"	"google.golang.org/grpc/resolver"	"google.golang.org/grpc/serviceconfig"	"google.golang.org/grpc/status"	_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin.	_ "google.golang.org/grpc/internal/resolver/dns"         // To register dns resolver.	_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.)const (	// minimum time to give a connection to complete	minConnectTimeout = 20 * time.Second	// must match grpclbName in grpclb/grpclb.go	grpclbName = "grpclb")var (	// ErrClientConnClosing indicates that the operation is illegal because	// the ClientConn is closing.	//	// Deprecated: this error should not be relied upon by users; use the status	// code of Canceled instead.	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.	errConnDrain = errors.New("grpc: the connection is drained")	// errConnClosing indicates that the connection is closing.	errConnClosing = errors.New("grpc: the connection is closing")	// errBalancerClosed indicates that the balancer is closed.	errBalancerClosed = errors.New("grpc: balancer is closed")	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default	// service config.	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid")// The following errors are returned from Dial and DialContextvar (	// errNoTransportSecurity indicates that there is no transport security	// being set for ClientConn. Users should either set one or explicitly	// call WithInsecure DialOption to disable security.	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")	// errTransportCredsAndBundle indicates that creds bundle is used together	// with other individual Transport Credentials.	errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")	// errTransportCredentialsMissing indicates that users want to transmit security	// information (e.g., OAuth2 token) which requires secure connection on an insecure	// connection.	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")	// errCredentialsConflict indicates that grpc.WithTransportCredentials()	// and grpc.WithInsecure() are both called for a connection.	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)"))const (	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4	defaultClientMaxSendMessageSize    = math.MaxInt32	// http2IOBufSize specifies the buffer size for sending frames.	defaultWriteBufSize = 32 * 1024	defaultReadBufSize  = 32 * 1024)// Dial creates a client connection to the given target.func Dial(target string, opts ...DialOption) (*ClientConn, error) {	return DialContext(context.Background(), target, opts...)}// DialContext creates a client connection to the given target. By default, it's// a non-blocking dial (the function won't wait for connections to be// established, and connecting happens in the background). To make it a blocking// dial, use WithBlock() dial option.//// In the non-blocking case, the ctx does not act against the connection. It// only controls the setup steps.//// In the blocking case, ctx can be used to cancel or expire the pending// connection. Once this function returns, the cancellation and expiration of// ctx will be noop. Users should call ClientConn.Close to terminate all the// pending operations after this function returns.//// The target name syntax is defined in// https://github.com/grpc/grpc/blob/master/doc/naming.md.// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {	cc := &ClientConn{		target:            target,		csMgr:             &connectivityStateManager{},		conns:             make(map[*addrConn]struct{}),		dopts:             defaultDialOptions(),		blockingpicker:    newPickerWrapper(),		czData:            new(channelzData),		firstResolveEvent: grpcsync.NewEvent(),	}	cc.retryThrottler.Store((*retryThrottler)(nil))	cc.ctx, cc.cancel = context.WithCancel(context.Background())	for _, opt := range opts {		opt.apply(&cc.dopts)	}	chainUnaryClientInterceptors(cc)	chainStreamClientInterceptors(cc)	defer func() {		if err != nil {			cc.Close()		}	}()	if channelz.IsOn() {		if cc.dopts.channelzParentID != 0 {			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)			channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{				Desc:     "Channel Created",				Severity: channelz.CtINFO,				Parent: &channelz.TraceEventDesc{					Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),					Severity: channelz.CtINFO,				},			})		} else {			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)			channelz.Info(cc.channelzID, "Channel Created")		}		cc.csMgr.channelzID = cc.channelzID	}	if !cc.dopts.insecure {		if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {			return nil, errNoTransportSecurity		}		if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {			return nil, errTransportCredsAndBundle		}	} else {		if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {			return nil, errCredentialsConflict		}		for _, cd := range cc.dopts.copts.PerRPCCredentials {			if cd.RequireTransportSecurity() {				return nil, errTransportCredentialsMissing			}		}	}	if cc.dopts.defaultServiceConfigRawJSON != nil {		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)		if scpr.Err != nil {			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)		}		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)	}	cc.mkp = cc.dopts.copts.KeepaliveParams	if cc.dopts.copts.Dialer == nil {		cc.dopts.copts.Dialer = newProxyDialer(			func(ctx context.Context, addr string) (net.Conn, error) {				network, addr := parseDialTarget(addr)				return (&net.Dialer{}).DialContext(ctx, network, addr)			},		)	}	if cc.dopts.copts.UserAgent != "" {		cc.dopts.copts.UserAgent += " " + grpcUA	} else {		cc.dopts.copts.UserAgent = grpcUA	}	if cc.dopts.timeout > 0 {		var cancel context.CancelFunc		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)		defer cancel()	}	defer func() {		select {		case <-ctx.Done():			conn, err = nil, ctx.Err()		default:		}	}()	scSet := false	if cc.dopts.scChan != nil {		// Try to get an initial service config.		select {		case sc, ok := <-cc.dopts.scChan:			if ok {				cc.sc = &sc				scSet = true			}		default:		}	}	if cc.dopts.bs == nil {		cc.dopts.bs = backoff.DefaultExponential	}	// Determine the resolver to use.	cc.parsedTarget = grpcutil.ParseTarget(cc.target)	channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)	resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)	if resolverBuilder == nil {		// If resolver builder is still nil, the parsed target's scheme is		// not registered. Fallback to default resolver and set Endpoint to		// the original target.		channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)		cc.parsedTarget = resolver.Target{			Scheme:   resolver.GetDefaultScheme(),			Endpoint: target,		}		resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)		if resolverBuilder == nil {			return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)		}	}	creds := cc.dopts.copts.TransportCredentials	if creds != nil && creds.Info().ServerName != "" {		cc.authority = creds.Info().ServerName	} else if cc.dopts.insecure && cc.dopts.authority != "" {		cc.authority = cc.dopts.authority	} else {		// Use endpoint from "scheme://authority/endpoint" as the default		// authority for ClientConn.		cc.authority = cc.parsedTarget.Endpoint	}	if cc.dopts.scChan != nil && !scSet {		// Blocking wait for the initial service config.		select {		case sc, ok := <-cc.dopts.scChan:			if ok {				cc.sc = &sc			}		case <-ctx.Done():			return nil, ctx.Err()		}	}	if cc.dopts.scChan != nil {		go cc.scWatcher()	}	var credsClone credentials.TransportCredentials	if creds := cc.dopts.copts.TransportCredentials; creds != nil {		credsClone = creds.Clone()	}	cc.balancerBuildOpts = balancer.BuildOptions{		DialCreds:        credsClone,		CredsBundle:      cc.dopts.copts.CredsBundle,		Dialer:           cc.dopts.copts.Dialer,		ChannelzParentID: cc.channelzID,		Target:           cc.parsedTarget,	}	// Build the resolver.	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)	if err != nil {		return nil, fmt.Errorf("failed to build resolver: %v", err)	}	cc.mu.Lock()	cc.resolverWrapper = rWrapper	cc.mu.Unlock()	// A blocking dial blocks until the clientConn is ready.	if cc.dopts.block {		for {			s := cc.GetState()			if s == connectivity.Ready {				break			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {				if err = cc.blockingpicker.connectionError(); err != nil {					terr, ok := err.(interface {						Temporary() bool					})					if ok && !terr.Temporary() {						return nil, err					}				}			}			if !cc.WaitForStateChange(ctx, s) {				// ctx got timeout or canceled.				return nil, ctx.Err()			}		}	}	return cc, nil}// chainUnaryClientInterceptors chains all unary client interceptors into one.func chainUnaryClientInterceptors(cc *ClientConn) {	interceptors := cc.dopts.chainUnaryInts	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will	// be executed before any other chained interceptors.	if cc.dopts.unaryInt != nil {		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)	}	var chainedInt UnaryClientInterceptor	if len(interceptors) == 0 {		chainedInt = nil	} else if len(interceptors) == 1 {		chainedInt = interceptors[0]	} else {		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)		}	}	cc.dopts.unaryInt = chainedInt}// getChainUnaryInvoker recursively generate the chained unary invoker.func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {	if curr == len(interceptors)-1 {		return finalInvoker	}	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)	}}// chainStreamClientInterceptors chains all stream client interceptors into one.func chainStreamClientInterceptors(cc *ClientConn) {	interceptors := cc.dopts.chainStreamInts	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will	// be executed before any other chained interceptors.	if cc.dopts.streamInt != nil {		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)	}	var chainedInt StreamClientInterceptor	if len(interceptors) == 0 {		chainedInt = nil	} else if len(interceptors) == 1 {		chainedInt = interceptors[0]	} else {		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)		}	}	cc.dopts.streamInt = chainedInt}// getChainStreamer recursively generate the chained client stream constructor.func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {	if curr == len(interceptors)-1 {		return finalStreamer	}	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)	}}// connectivityStateManager keeps the connectivity.State of ClientConn.// This struct will eventually be exported so the balancers can access it.type connectivityStateManager struct {	mu         sync.Mutex	state      connectivity.State	notifyChan chan struct{}	channelzID int64}// updateState updates the connectivity.State of ClientConn.// If there's a change it notifies goroutines waiting on state change to// happen.func (csm *connectivityStateManager) updateState(state connectivity.State) {	csm.mu.Lock()	defer csm.mu.Unlock()	if csm.state == connectivity.Shutdown {		return	}	if csm.state == state {		return	}	csm.state = state	channelz.Infof(csm.channelzID, "Channel Connectivity change to %v", state)	if csm.notifyChan != nil {		// There are other goroutines waiting on this channel.		close(csm.notifyChan)		csm.notifyChan = nil	}}func (csm *connectivityStateManager) getState() connectivity.State {	csm.mu.Lock()	defer csm.mu.Unlock()	return csm.state}func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {	csm.mu.Lock()	defer csm.mu.Unlock()	if csm.notifyChan == nil {		csm.notifyChan = make(chan struct{})	}	return csm.notifyChan}// ClientConnInterface defines the functions clients need to perform unary and// streaming RPCs.  It is implemented by *ClientConn, and is only intended to// be referenced by generated code.type ClientConnInterface interface {	// Invoke performs a unary RPC and returns after the response is received	// into reply.	Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error	// NewStream begins a streaming RPC.	NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)}// Assert *ClientConn implements ClientConnInterface.var _ ClientConnInterface = (*ClientConn)(nil)// ClientConn represents a virtual connection to a conceptual endpoint, to// perform RPCs.//// A ClientConn is free to have zero or more actual connections to the endpoint// based on configuration, load, etc. It is also free to determine which actual// endpoints to use and may change it every RPC, permitting client-side load// balancing.//// A ClientConn encapsulates a range of functionality including name// resolution, TCP connection establishment (with retries and backoff) and TLS// handshakes. It also handles errors on established connections by// re-resolving the name and reconnecting.type ClientConn struct {	ctx    context.Context	cancel context.CancelFunc	target       string	parsedTarget resolver.Target	authority    string	dopts        dialOptions	csMgr        *connectivityStateManager	balancerBuildOpts balancer.BuildOptions	blockingpicker    *pickerWrapper	mu              sync.RWMutex	resolverWrapper *ccResolverWrapper	sc              *ServiceConfig	conns           map[*addrConn]struct{}	// Keepalive parameter can be updated if a GoAway is received.	mkp             keepalive.ClientParameters	curBalancerName string	balancerWrapper *ccBalancerWrapper	retryThrottler  atomic.Value	firstResolveEvent *grpcsync.Event	channelzID int64 // channelz unique identification number	czData     *channelzData}// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or// ctx expires. A true value is returned in former case and false in latter.// This is an EXPERIMENTAL API.func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {	ch := cc.csMgr.getNotifyChan()	if cc.csMgr.getState() != sourceState {		return true	}	select {	case <-ctx.Done():		return false	case <-ch:		return true	}}// GetState returns the connectivity.State of ClientConn.// This is an EXPERIMENTAL API.func (cc *ClientConn) GetState() connectivity.State {	return cc.csMgr.getState()}func (cc *ClientConn) scWatcher() {	for {		select {		case sc, ok := <-cc.dopts.scChan:			if !ok {				return			}			cc.mu.Lock()			// TODO: load balance policy runtime change is ignored.			// We may revisit this decision in the future.			cc.sc = &sc			cc.mu.Unlock()		case <-cc.ctx.Done():			return		}	}}// waitForResolvedAddrs blocks until the resolver has provided addresses or the// context expires.  Returns nil unless the context expires first; otherwise// returns a status error based on the context.func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {	// This is on the RPC path, so we use a fast path to avoid the	// more-expensive "select" below after the resolver has returned once.	if cc.firstResolveEvent.HasFired() {		return nil	}	select {	case <-cc.firstResolveEvent.Done():		return nil	case <-ctx.Done():		return status.FromContextError(ctx.Err()).Err()	case <-cc.ctx.Done():		return ErrClientConnClosing	}}var emptyServiceConfig *ServiceConfigfunc init() {	cfg := parseServiceConfig("{}")	if cfg.Err != nil {		panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))	}	emptyServiceConfig = cfg.Config.(*ServiceConfig)}func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {	if cc.sc != nil {		cc.applyServiceConfigAndBalancer(cc.sc, addrs)		return	}	if cc.dopts.defaultServiceConfig != nil {		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)	} else {		cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)	}}func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {	defer cc.firstResolveEvent.Fire()	cc.mu.Lock()	// Check if the ClientConn is already closed. Some fields (e.g.	// balancerWrapper) are set to nil when closing the ClientConn, and could	// cause nil pointer panic if we don't have this check.	if cc.conns == nil {		cc.mu.Unlock()		return nil	}	if err != nil {		// May need to apply the initial service config in case the resolver		// doesn't support service configs, or doesn't provide a service config		// with the new addresses.		cc.maybeApplyDefaultServiceConfig(nil)		if cc.balancerWrapper != nil {			cc.balancerWrapper.resolverError(err)		}		// No addresses are valid with err set; return early.		cc.mu.Unlock()		return balancer.ErrBadResolverState	}	var ret error	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {		cc.maybeApplyDefaultServiceConfig(s.Addresses)		// TODO: do we need to apply a failing LB policy if there is no		// default, per the error handling design?	} else {		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {			cc.applyServiceConfigAndBalancer(sc, s.Addresses)		} else {			ret = balancer.ErrBadResolverState			if cc.balancerWrapper == nil {				var err error				if s.ServiceConfig.Err != nil {					err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)				} else {					err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)				}				cc.blockingpicker.updatePicker(base.NewErrPicker(err))				cc.csMgr.updateState(connectivity.TransientFailure)				cc.mu.Unlock()				return ret			}		}	}	var balCfg serviceconfig.LoadBalancingConfig	if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {		balCfg = cc.sc.lbConfig.cfg	}	cbn := cc.curBalancerName	bw := cc.balancerWrapper	cc.mu.Unlock()	if cbn != grpclbName {		// Filter any grpclb addresses since we don't have the grpclb balancer.		for i := 0; i < len(s.Addresses); {			if s.Addresses[i].Type == resolver.GRPCLB {				copy(s.Addresses[i:], s.Addresses[i+1:])				s.Addresses = s.Addresses[:len(s.Addresses)-1]				continue			}			i++		}	}	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})	if ret == nil {		ret = uccsErr // prefer ErrBadResolver state since any other error is		// currently meaningless to the caller.	}	return ret}// switchBalancer starts the switching from current balancer to the balancer// with the given name.//// It will NOT send the current address list to the new balancer. If needed,// caller of this function should send address list to the new balancer after// this function returns.//// Caller must hold cc.mu.func (cc *ClientConn) switchBalancer(name string) {	if strings.EqualFold(cc.curBalancerName, name) {		return	}	channelz.Infof(cc.channelzID, "ClientConn switching balancer to %q", name)	if cc.dopts.balancerBuilder != nil {		channelz.Info(cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")		return	}	if cc.balancerWrapper != nil {		cc.balancerWrapper.close()	}	builder := balancer.Get(name)	if builder == nil {		channelz.Warningf(cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)		channelz.Infof(cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)		builder = newPickfirstBuilder()	} else {		channelz.Infof(cc.channelzID, "Channel switches to new LB policy %q", name)	}	cc.curBalancerName = builder.Name()	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)}func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {	cc.mu.Lock()	if cc.conns == nil {		cc.mu.Unlock()		return	}	// TODO(bar switching) send updates to all balancer wrappers when balancer	// gracefully switching is supported.	cc.balancerWrapper.handleSubConnStateChange(sc, s, err)	cc.mu.Unlock()}// newAddrConn creates an addrConn for addrs and adds it to cc.conns.//// Caller needs to make sure len(addrs) > 0.func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {	ac := &addrConn{		state:        connectivity.Idle,		cc:           cc,		addrs:        addrs,		scopts:       opts,		dopts:        cc.dopts,		czData:       new(channelzData),		resetBackoff: make(chan struct{}),	}	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)	// Track ac in cc. This needs to be done before any getTransport(...) is called.	cc.mu.Lock()	if cc.conns == nil {		cc.mu.Unlock()		return nil, ErrClientConnClosing	}	if channelz.IsOn() {		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")		channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{			Desc:     "Subchannel Created",			Severity: channelz.CtINFO,			Parent: &channelz.TraceEventDesc{				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),				Severity: channelz.CtINFO,			},		})	}	cc.conns[ac] = struct{}{}	cc.mu.Unlock()	return ac, nil}// removeAddrConn removes the addrConn in the subConn from clientConn.// It also tears down the ac with the given error.func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {	cc.mu.Lock()	if cc.conns == nil {		cc.mu.Unlock()		return	}	delete(cc.conns, ac)	cc.mu.Unlock()	ac.tearDown(err)}func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {	return &channelz.ChannelInternalMetric{		State:                    cc.GetState(),		Target:                   cc.target,		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),	}}// Target returns the target string of the ClientConn.// This is an EXPERIMENTAL API.func (cc *ClientConn) Target() string {	return cc.target}func (cc *ClientConn) incrCallsStarted() {	atomic.AddInt64(&cc.czData.callsStarted, 1)	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())}func (cc *ClientConn) incrCallsSucceeded() {	atomic.AddInt64(&cc.czData.callsSucceeded, 1)}func (cc *ClientConn) incrCallsFailed() {	atomic.AddInt64(&cc.czData.callsFailed, 1)}// connect starts creating a transport.// It does nothing if the ac is not IDLE.// TODO(bar) Move this to the addrConn section.func (ac *addrConn) connect() error {	ac.mu.Lock()	if ac.state == connectivity.Shutdown {		ac.mu.Unlock()		return errConnClosing	}	if ac.state != connectivity.Idle {		ac.mu.Unlock()		return nil	}	// Update connectivity state within the lock to prevent subsequent or	// concurrent calls from resetting the transport more than once.	ac.updateConnectivityState(connectivity.Connecting, nil)	ac.mu.Unlock()	// Start a goroutine connecting to the server asynchronously.	go ac.resetTransport()	return nil}// tryUpdateAddrs tries to update ac.addrs with the new addresses list.//// If ac is Connecting, it returns false. The caller should tear down the ac and// create a new one. Note that the backoff will be reset when this happens.//// If ac is TransientFailure, it updates ac.addrs and returns true. The updated// addresses will be picked up by retry in the next iteration after backoff.//// If ac is Shutdown or Idle, it updates ac.addrs and returns true.//// If ac is Ready, it checks whether current connected address of ac is in the// new addrs list.//  - If true, it updates ac.addrs and returns true. The ac will keep using//    the existing connection.//  - If false, it does nothing and returns false.func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {	ac.mu.Lock()	defer ac.mu.Unlock()	channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)	if ac.state == connectivity.Shutdown ||		ac.state == connectivity.TransientFailure ||		ac.state == connectivity.Idle {		ac.addrs = addrs		return true	}	if ac.state == connectivity.Connecting {		return false	}	// ac.state is Ready, try to find the connected address.	var curAddrFound bool	for _, a := range addrs {		if reflect.DeepEqual(ac.curAddr, a) {			curAddrFound = true			break		}	}	channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)	if curAddrFound {		ac.addrs = addrs	}	return curAddrFound}// GetMethodConfig gets the method config of the input method.// If there's an exact match for input method (i.e. /service/method), we return// the corresponding MethodConfig.// If there isn't an exact match for the input method, we look for the default config// under the service (i.e /service/). If there is a default MethodConfig for// the service, we return it.// Otherwise, we return an empty MethodConfig.func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {	// TODO: Avoid the locking here.	cc.mu.RLock()	defer cc.mu.RUnlock()	if cc.sc == nil {		return MethodConfig{}	}	m, ok := cc.sc.Methods[method]	if !ok {		i := strings.LastIndex(method, "/")		m = cc.sc.Methods[method[:i+1]]	}	return m}func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {	cc.mu.RLock()	defer cc.mu.RUnlock()	if cc.sc == nil {		return nil	}	return cc.sc.healthCheckConfig}func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{		Ctx:            ctx,		FullMethodName: method,	})	if err != nil {		return nil, nil, toRPCErr(err)	}	return t, done, nil}func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {	if sc == nil {		// should never reach here.		return	}	cc.sc = sc	if cc.sc.retryThrottling != nil {		newThrottler := &retryThrottler{			tokens: cc.sc.retryThrottling.MaxTokens,			max:    cc.sc.retryThrottling.MaxTokens,			thresh: cc.sc.retryThrottling.MaxTokens / 2,			ratio:  cc.sc.retryThrottling.TokenRatio,		}		cc.retryThrottler.Store(newThrottler)	} else {		cc.retryThrottler.Store((*retryThrottler)(nil))	}	if cc.dopts.balancerBuilder == nil {		// Only look at balancer types and switch balancer if balancer dial		// option is not set.		var newBalancerName string		if cc.sc != nil && cc.sc.lbConfig != nil {			newBalancerName = cc.sc.lbConfig.name		} else {			var isGRPCLB bool			for _, a := range addrs {				if a.Type == resolver.GRPCLB {					isGRPCLB = true					break				}			}			if isGRPCLB {				newBalancerName = grpclbName			} else if cc.sc != nil && cc.sc.LB != nil {				newBalancerName = *cc.sc.LB			} else {				newBalancerName = PickFirstBalancerName			}		}		cc.switchBalancer(newBalancerName)	} else if cc.balancerWrapper == nil {		// Balancer dial option was set, and this is the first time handling		// resolved addresses. Build a balancer with dopts.balancerBuilder.		cc.curBalancerName = cc.dopts.balancerBuilder.Name()		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)	}}func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {	cc.mu.RLock()	r := cc.resolverWrapper	cc.mu.RUnlock()	if r == nil {		return	}	go r.resolveNow(o)}// ResetConnectBackoff wakes up all subchannels in transient failure and causes// them to attempt another connection immediately.  It also resets the backoff// times used for subsequent attempts regardless of the current state.//// In general, this function should not be used.  Typical service or network// outages result in a reasonable client reconnection strategy by default.// However, if a previously unavailable network becomes available, this may be// used to trigger an immediate reconnect.//// This API is EXPERIMENTAL.func (cc *ClientConn) ResetConnectBackoff() {	cc.mu.Lock()	conns := cc.conns	cc.mu.Unlock()	for ac := range conns {		ac.resetConnectBackoff()	}}// Close tears down the ClientConn and all underlying connections.func (cc *ClientConn) Close() error {	defer cc.cancel()	cc.mu.Lock()	if cc.conns == nil {		cc.mu.Unlock()		return ErrClientConnClosing	}	conns := cc.conns	cc.conns = nil	cc.csMgr.updateState(connectivity.Shutdown)	rWrapper := cc.resolverWrapper	cc.resolverWrapper = nil	bWrapper := cc.balancerWrapper	cc.balancerWrapper = nil	cc.mu.Unlock()	cc.blockingpicker.close()	if rWrapper != nil {		rWrapper.close()	}	if bWrapper != nil {		bWrapper.close()	}	for ac := range conns {		ac.tearDown(ErrClientConnClosing)	}	if channelz.IsOn() {		ted := &channelz.TraceEventDesc{			Desc:     "Channel Deleted",			Severity: channelz.CtINFO,		}		if cc.dopts.channelzParentID != 0 {			ted.Parent = &channelz.TraceEventDesc{				Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),				Severity: channelz.CtINFO,			}		}		channelz.AddTraceEvent(cc.channelzID, 0, ted)		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to		// the entity being deleted, and thus prevent it from being deleted right away.		channelz.RemoveEntry(cc.channelzID)	}	return nil}// addrConn is a network connection to a given address.type addrConn struct {	ctx    context.Context	cancel context.CancelFunc	cc     *ClientConn	dopts  dialOptions	acbw   balancer.SubConn	scopts balancer.NewSubConnOptions	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel	// health checking may require server to report healthy to set ac to READY), and is reset	// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway	// is received, transport is closed, ac has been torn down).	transport transport.ClientTransport // The current transport.	mu      sync.Mutex	curAddr resolver.Address   // The current address.	addrs   []resolver.Address // All addresses that the resolver resolved to.	// Use updateConnectivityState for updating addrConn's connectivity state.	state connectivity.State	backoffIdx   int // Needs to be stateful for resetConnectBackoff.	resetBackoff chan struct{}	channelzID int64 // channelz unique identification number.	czData     *channelzData}// Note: this requires a lock on ac.mu.func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {	if ac.state == s {		return	}	ac.state = s	channelz.Infof(ac.channelzID, "Subchannel Connectivity change to %v", s)	ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)}// adjustParams updates parameters used to create transports upon// receiving a GoAway.func (ac *addrConn) adjustParams(r transport.GoAwayReason) {	switch r {	case transport.GoAwayTooManyPings:		v := 2 * ac.dopts.copts.KeepaliveParams.Time		ac.cc.mu.Lock()		if v > ac.cc.mkp.Time {			ac.cc.mkp.Time = v		}		ac.cc.mu.Unlock()	}}func (ac *addrConn) resetTransport() {	for i := 0; ; i++ {		if i > 0 {			ac.cc.resolveNow(resolver.ResolveNowOptions{})		}		ac.mu.Lock()		if ac.state == connectivity.Shutdown {			ac.mu.Unlock()			return		}		addrs := ac.addrs		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)		// This will be the duration that dial gets to finish.		dialDuration := minConnectTimeout		if ac.dopts.minConnectTimeout != nil {			dialDuration = ac.dopts.minConnectTimeout()		}		if dialDuration < backoffFor {			// Give dial more time as we keep failing to connect.			dialDuration = backoffFor		}		// We can potentially spend all the time trying the first address, and		// if the server accepts the connection and then hangs, the following		// addresses will never be tried.		//		// The spec doesn't mention what should be done for multiple addresses.		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm		connectDeadline := time.Now().Add(dialDuration)		ac.updateConnectivityState(connectivity.Connecting, nil)		ac.transport = nil		ac.mu.Unlock()		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)		if err != nil {			// After exhausting all addresses, the addrConn enters			// TRANSIENT_FAILURE.			ac.mu.Lock()			if ac.state == connectivity.Shutdown {				ac.mu.Unlock()				return			}			ac.updateConnectivityState(connectivity.TransientFailure, err)			// Backoff.			b := ac.resetBackoff			ac.mu.Unlock()			timer := time.NewTimer(backoffFor)			select {			case <-timer.C:				ac.mu.Lock()				ac.backoffIdx++				ac.mu.Unlock()			case <-b:				timer.Stop()			case <-ac.ctx.Done():				timer.Stop()				return			}			continue		}		ac.mu.Lock()		if ac.state == connectivity.Shutdown {			ac.mu.Unlock()			newTr.Close()			return		}		ac.curAddr = addr		ac.transport = newTr		ac.backoffIdx = 0		hctx, hcancel := context.WithCancel(ac.ctx)		ac.startHealthCheck(hctx)		ac.mu.Unlock()		// Block until the created transport is down. And when this happens,		// we restart from the top of the addr list.		<-reconnect.Done()		hcancel()		// restart connecting - the top of the loop will set state to		// CONNECTING.  This is against the current connectivity semantics doc,		// however it allows for graceful behavior for RPCs not yet dispatched		// - unfortunate timing would otherwise lead to the RPC failing even		// though the TRANSIENT_FAILURE state (called for by the doc) would be		// instantaneous.		//		// Ideally we should transition to Idle here and block until there is		// RPC activity that leads to the balancer requesting a reconnect of		// the associated SubConn.	}}// tryAllAddrs tries to creates a connection to the addresses, and stop when at the// first successful one. It returns the transport, the address and a Event in// the successful case. The Event fires when the returned transport disconnects.func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {	var firstConnErr error	for _, addr := range addrs {		ac.mu.Lock()		if ac.state == connectivity.Shutdown {			ac.mu.Unlock()			return nil, resolver.Address{}, nil, errConnClosing		}		ac.cc.mu.RLock()		ac.dopts.copts.KeepaliveParams = ac.cc.mkp		ac.cc.mu.RUnlock()		copts := ac.dopts.copts		if ac.scopts.CredsBundle != nil {			copts.CredsBundle = ac.scopts.CredsBundle		}		ac.mu.Unlock()		channelz.Infof(ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)		if err == nil {			return newTr, addr, reconnect, nil		}		if firstConnErr == nil {			firstConnErr = err		}		ac.cc.blockingpicker.updateConnectionError(err)	}	// Couldn't connect to any address.	return nil, resolver.Address{}, nil, firstConnErr}// createTransport creates a connection to addr. It returns the transport and a// Event in the successful case. The Event fires when the returned transport// disconnects.func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {	prefaceReceived := make(chan struct{})	onCloseCalled := make(chan struct{})	reconnect := grpcsync.NewEvent()	authority := ac.cc.authority	// addr.ServerName takes precedent over ClientConn authority, if present.	if addr.ServerName != "" {		authority = addr.ServerName	}	target := transport.TargetInfo{		Addr:      addr.Addr,		Metadata:  addr.Metadata,		Authority: authority,	}	once := sync.Once{}	onGoAway := func(r transport.GoAwayReason) {		ac.mu.Lock()		ac.adjustParams(r)		once.Do(func() {			if ac.state == connectivity.Ready {				// Prevent this SubConn from being used for new RPCs by setting its				// state to Connecting.				//				// TODO: this should be Idle when grpc-go properly supports it.				ac.updateConnectivityState(connectivity.Connecting, nil)			}		})		ac.mu.Unlock()		reconnect.Fire()	}	onClose := func() {		ac.mu.Lock()		once.Do(func() {			if ac.state == connectivity.Ready {				// Prevent this SubConn from being used for new RPCs by setting its				// state to Connecting.				//				// TODO: this should be Idle when grpc-go properly supports it.				ac.updateConnectivityState(connectivity.Connecting, nil)			}		})		ac.mu.Unlock()		close(onCloseCalled)		reconnect.Fire()	}	onPrefaceReceipt := func() {		close(prefaceReceived)	}	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)	defer cancel()	if channelz.IsOn() {		copts.ChannelzParentID = ac.channelzID	}	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)	if err != nil {		// newTr is either nil, or closed.		channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)		return nil, nil, err	}	select {	case <-time.After(time.Until(connectDeadline)):		// We didn't get the preface in time.		newTr.Close()		channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)		return nil, nil, errors.New("timed out waiting for server handshake")	case <-prefaceReceived:		// We got the preface - huzzah! things are good.	case <-onCloseCalled:		// The transport has already closed - noop.		return nil, nil, errors.New("connection closed")		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.	}	return newTr, reconnect, nil}// startHealthCheck starts the health checking stream (RPC) to watch the health// stats of this connection if health checking is requested and configured.//// LB channel health checking is enabled when all requirements below are met:// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package// 3. a service config with non-empty healthCheckConfig field is provided// 4. the load balancer requests it//// It sets addrConn to READY if the health checking stream is not started.//// Caller must hold ac.mu.func (ac *addrConn) startHealthCheck(ctx context.Context) {	var healthcheckManagingState bool	defer func() {		if !healthcheckManagingState {			ac.updateConnectivityState(connectivity.Ready, nil)		}	}()	if ac.cc.dopts.disableHealthCheck {		return	}	healthCheckConfig := ac.cc.healthCheckConfig()	if healthCheckConfig == nil {		return	}	if !ac.scopts.HealthCheckEnabled {		return	}	healthCheckFunc := ac.cc.dopts.healthCheckFunc	if healthCheckFunc == nil {		// The health package is not imported to set health check function.		//		// TODO: add a link to the health check doc in the error message.		channelz.Error(ac.channelzID, "Health check is requested but health check function is not set.")		return	}	healthcheckManagingState = true	// Set up the health check helper functions.	currentTr := ac.transport	newStream := func(method string) (interface{}, error) {		ac.mu.Lock()		if ac.transport != currentTr {			ac.mu.Unlock()			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")		}		ac.mu.Unlock()		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)	}	setConnectivityState := func(s connectivity.State, lastErr error) {		ac.mu.Lock()		defer ac.mu.Unlock()		if ac.transport != currentTr {			return		}		ac.updateConnectivityState(s, lastErr)	}	// Start the health checking stream.	go func() {		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)		if err != nil {			if status.Code(err) == codes.Unimplemented {				channelz.Error(ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")			} else {				channelz.Errorf(ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)			}		}	}()}func (ac *addrConn) resetConnectBackoff() {	ac.mu.Lock()	close(ac.resetBackoff)	ac.backoffIdx = 0	ac.resetBackoff = make(chan struct{})	ac.mu.Unlock()}// getReadyTransport returns the transport if ac's state is READY.// Otherwise it returns nil, false.// If ac's state is IDLE, it will trigger ac to connect.func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {	ac.mu.Lock()	if ac.state == connectivity.Ready && ac.transport != nil {		t := ac.transport		ac.mu.Unlock()		return t, true	}	var idle bool	if ac.state == connectivity.Idle {		idle = true	}	ac.mu.Unlock()	// Trigger idle ac to connect.	if idle {		ac.connect()	}	return nil, false}// tearDown starts to tear down the addrConn.// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in// some edge cases (e.g., the caller opens and closes many addrConn's in a// tight loop.// tearDown doesn't remove ac from ac.cc.conns.func (ac *addrConn) tearDown(err error) {	ac.mu.Lock()	if ac.state == connectivity.Shutdown {		ac.mu.Unlock()		return	}	curTr := ac.transport	ac.transport = nil	// We have to set the state to Shutdown before anything else to prevent races	// between setting the state and logic that waits on context cancellation / etc.	ac.updateConnectivityState(connectivity.Shutdown, nil)	ac.cancel()	ac.curAddr = resolver.Address{}	if err == errConnDrain && curTr != nil {		// GracefulClose(...) may be executed multiple times when		// i) receiving multiple GoAway frames from the server; or		// ii) there are concurrent name resolver/Balancer triggered		// address removal and GoAway.		// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.		ac.mu.Unlock()		curTr.GracefulClose()		ac.mu.Lock()	}	if channelz.IsOn() {		channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{			Desc:     "Subchannel Deleted",			Severity: channelz.CtINFO,			Parent: &channelz.TraceEventDesc{				Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),				Severity: channelz.CtINFO,			},		})		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to		// the entity being deleted, and thus prevent it from being deleted right away.		channelz.RemoveEntry(ac.channelzID)	}	ac.mu.Unlock()}func (ac *addrConn) getState() connectivity.State {	ac.mu.Lock()	defer ac.mu.Unlock()	return ac.state}func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {	ac.mu.Lock()	addr := ac.curAddr.Addr	ac.mu.Unlock()	return &channelz.ChannelInternalMetric{		State:                    ac.getState(),		Target:                   addr,		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),	}}func (ac *addrConn) incrCallsStarted() {	atomic.AddInt64(&ac.czData.callsStarted, 1)	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())}func (ac *addrConn) incrCallsSucceeded() {	atomic.AddInt64(&ac.czData.callsSucceeded, 1)}func (ac *addrConn) incrCallsFailed() {	atomic.AddInt64(&ac.czData.callsFailed, 1)}type retryThrottler struct {	max    float64	thresh float64	ratio  float64	mu     sync.Mutex	tokens float64 // TODO(dfawley): replace with atomic and remove lock.}// throttle subtracts a retry token from the pool and returns whether a retry// should be throttled (disallowed) based upon the retry throttling policy in// the service config.func (rt *retryThrottler) throttle() bool {	if rt == nil {		return false	}	rt.mu.Lock()	defer rt.mu.Unlock()	rt.tokens--	if rt.tokens < 0 {		rt.tokens = 0	}	return rt.tokens <= rt.thresh}func (rt *retryThrottler) successfulRPC() {	if rt == nil {		return	}	rt.mu.Lock()	defer rt.mu.Unlock()	rt.tokens += rt.ratio	if rt.tokens > rt.max {		rt.tokens = rt.max	}}type channelzChannel struct {	cc *ClientConn}func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {	return c.cc.channelzMetric()}// ErrClientConnTimeout indicates that the ClientConn cannot establish the// underlying connections within the specified timeout.//// Deprecated: This error is never returned by grpc and should not be// referenced by users.var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")func (cc *ClientConn) getResolver(scheme string) resolver.Builder {	for _, rb := range cc.dopts.resolvers {		if cc.parsedTarget.Scheme == rb.Scheme() {			return rb		}	}	return resolver.Get(cc.parsedTarget.Scheme)}
 |