| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 | 
							- /*
 
-  *
 
-  * Copyright 2017 gRPC authors.
 
-  *
 
-  * Licensed under the Apache License, Version 2.0 (the "License");
 
-  * you may not use this file except in compliance with the License.
 
-  * You may obtain a copy of the License at
 
-  *
 
-  *     http://www.apache.org/licenses/LICENSE-2.0
 
-  *
 
-  * Unless required by applicable law or agreed to in writing, software
 
-  * distributed under the License is distributed on an "AS IS" BASIS,
 
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
-  * See the License for the specific language governing permissions and
 
-  * limitations under the License.
 
-  *
 
-  */
 
- package grpc
 
- import (
 
- 	"fmt"
 
- 	"sync"
 
- 	"google.golang.org/grpc/balancer"
 
- 	"google.golang.org/grpc/connectivity"
 
- 	"google.golang.org/grpc/internal/buffer"
 
- 	"google.golang.org/grpc/internal/channelz"
 
- 	"google.golang.org/grpc/internal/grpcsync"
 
- 	"google.golang.org/grpc/resolver"
 
- )
 
- // scStateUpdate contains the subConn and the new state it changed to.
 
- type scStateUpdate struct {
 
- 	sc    balancer.SubConn
 
- 	state connectivity.State
 
- 	err   error
 
- }
 
- // ccBalancerWrapper is a wrapper on top of cc for balancers.
 
- // It implements balancer.ClientConn interface.
 
- type ccBalancerWrapper struct {
 
- 	cc         *ClientConn
 
- 	balancerMu sync.Mutex // synchronizes calls to the balancer
 
- 	balancer   balancer.Balancer
 
- 	scBuffer   *buffer.Unbounded
 
- 	done       *grpcsync.Event
 
- 	mu       sync.Mutex
 
- 	subConns map[*acBalancerWrapper]struct{}
 
- }
 
- func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
 
- 	ccb := &ccBalancerWrapper{
 
- 		cc:       cc,
 
- 		scBuffer: buffer.NewUnbounded(),
 
- 		done:     grpcsync.NewEvent(),
 
- 		subConns: make(map[*acBalancerWrapper]struct{}),
 
- 	}
 
- 	go ccb.watcher()
 
- 	ccb.balancer = b.Build(ccb, bopts)
 
- 	return ccb
 
- }
 
- // watcher balancer functions sequentially, so the balancer can be implemented
 
- // lock-free.
 
- func (ccb *ccBalancerWrapper) watcher() {
 
- 	for {
 
- 		select {
 
- 		case t := <-ccb.scBuffer.Get():
 
- 			ccb.scBuffer.Load()
 
- 			if ccb.done.HasFired() {
 
- 				break
 
- 			}
 
- 			ccb.balancerMu.Lock()
 
- 			su := t.(*scStateUpdate)
 
- 			if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
 
- 				ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
 
- 			} else {
 
- 				ccb.balancer.HandleSubConnStateChange(su.sc, su.state)
 
- 			}
 
- 			ccb.balancerMu.Unlock()
 
- 		case <-ccb.done.Done():
 
- 		}
 
- 		if ccb.done.HasFired() {
 
- 			ccb.balancer.Close()
 
- 			ccb.mu.Lock()
 
- 			scs := ccb.subConns
 
- 			ccb.subConns = nil
 
- 			ccb.mu.Unlock()
 
- 			for acbw := range scs {
 
- 				ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
 
- 			}
 
- 			ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
 
- 			return
 
- 		}
 
- 	}
 
- }
 
- func (ccb *ccBalancerWrapper) close() {
 
- 	ccb.done.Fire()
 
- }
 
- func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
 
- 	// When updating addresses for a SubConn, if the address in use is not in
 
- 	// the new addresses, the old ac will be tearDown() and a new ac will be
 
- 	// created. tearDown() generates a state change with Shutdown state, we
 
- 	// don't want the balancer to receive this state change. So before
 
- 	// tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
 
- 	// this function will be called with (nil, Shutdown). We don't need to call
 
- 	// balancer method in this case.
 
- 	if sc == nil {
 
- 		return
 
- 	}
 
- 	ccb.scBuffer.Put(&scStateUpdate{
 
- 		sc:    sc,
 
- 		state: s,
 
- 		err:   err,
 
- 	})
 
- }
 
- func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
 
- 	ccb.balancerMu.Lock()
 
- 	defer ccb.balancerMu.Unlock()
 
- 	if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
 
- 		return ub.UpdateClientConnState(*ccs)
 
- 	}
 
- 	ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil)
 
- 	return nil
 
- }
 
- func (ccb *ccBalancerWrapper) resolverError(err error) {
 
- 	if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
 
- 		ccb.balancerMu.Lock()
 
- 		ub.ResolverError(err)
 
- 		ccb.balancerMu.Unlock()
 
- 	}
 
- }
 
- func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
 
- 	if len(addrs) <= 0 {
 
- 		return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
 
- 	}
 
- 	ccb.mu.Lock()
 
- 	defer ccb.mu.Unlock()
 
- 	if ccb.subConns == nil {
 
- 		return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
 
- 	}
 
- 	ac, err := ccb.cc.newAddrConn(addrs, opts)
 
- 	if err != nil {
 
- 		return nil, err
 
- 	}
 
- 	acbw := &acBalancerWrapper{ac: ac}
 
- 	acbw.ac.mu.Lock()
 
- 	ac.acbw = acbw
 
- 	acbw.ac.mu.Unlock()
 
- 	ccb.subConns[acbw] = struct{}{}
 
- 	return acbw, nil
 
- }
 
- func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
 
- 	acbw, ok := sc.(*acBalancerWrapper)
 
- 	if !ok {
 
- 		return
 
- 	}
 
- 	ccb.mu.Lock()
 
- 	defer ccb.mu.Unlock()
 
- 	if ccb.subConns == nil {
 
- 		return
 
- 	}
 
- 	delete(ccb.subConns, acbw)
 
- 	ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
 
- }
 
- func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
 
- 	ccb.mu.Lock()
 
- 	defer ccb.mu.Unlock()
 
- 	if ccb.subConns == nil {
 
- 		return
 
- 	}
 
- 	// Update picker before updating state.  Even though the ordering here does
 
- 	// not matter, it can lead to multiple calls of Pick in the common start-up
 
- 	// case where we wait for ready and then perform an RPC.  If the picker is
 
- 	// updated later, we could call the "connecting" picker when the state is
 
- 	// updated, and then call the "ready" picker after the picker gets updated.
 
- 	ccb.cc.blockingpicker.updatePicker(p)
 
- 	ccb.cc.csMgr.updateState(s)
 
- }
 
- func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
 
- 	ccb.mu.Lock()
 
- 	defer ccb.mu.Unlock()
 
- 	if ccb.subConns == nil {
 
- 		return
 
- 	}
 
- 	// Update picker before updating state.  Even though the ordering here does
 
- 	// not matter, it can lead to multiple calls of Pick in the common start-up
 
- 	// case where we wait for ready and then perform an RPC.  If the picker is
 
- 	// updated later, we could call the "connecting" picker when the state is
 
- 	// updated, and then call the "ready" picker after the picker gets updated.
 
- 	ccb.cc.blockingpicker.updatePickerV2(s.Picker)
 
- 	ccb.cc.csMgr.updateState(s.ConnectivityState)
 
- }
 
- func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
 
- 	ccb.cc.resolveNow(o)
 
- }
 
- func (ccb *ccBalancerWrapper) Target() string {
 
- 	return ccb.cc.target
 
- }
 
- // acBalancerWrapper is a wrapper on top of ac for balancers.
 
- // It implements balancer.SubConn interface.
 
- type acBalancerWrapper struct {
 
- 	mu sync.Mutex
 
- 	ac *addrConn
 
- }
 
- func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
 
- 	acbw.mu.Lock()
 
- 	defer acbw.mu.Unlock()
 
- 	if len(addrs) <= 0 {
 
- 		acbw.ac.tearDown(errConnDrain)
 
- 		return
 
- 	}
 
- 	if !acbw.ac.tryUpdateAddrs(addrs) {
 
- 		cc := acbw.ac.cc
 
- 		opts := acbw.ac.scopts
 
- 		acbw.ac.mu.Lock()
 
- 		// Set old ac.acbw to nil so the Shutdown state update will be ignored
 
- 		// by balancer.
 
- 		//
 
- 		// TODO(bar) the state transition could be wrong when tearDown() old ac
 
- 		// and creating new ac, fix the transition.
 
- 		acbw.ac.acbw = nil
 
- 		acbw.ac.mu.Unlock()
 
- 		acState := acbw.ac.getState()
 
- 		acbw.ac.tearDown(errConnDrain)
 
- 		if acState == connectivity.Shutdown {
 
- 			return
 
- 		}
 
- 		ac, err := cc.newAddrConn(addrs, opts)
 
- 		if err != nil {
 
- 			channelz.Warningf(acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
 
- 			return
 
- 		}
 
- 		acbw.ac = ac
 
- 		ac.mu.Lock()
 
- 		ac.acbw = acbw
 
- 		ac.mu.Unlock()
 
- 		if acState != connectivity.Idle {
 
- 			ac.connect()
 
- 		}
 
- 	}
 
- }
 
- func (acbw *acBalancerWrapper) Connect() {
 
- 	acbw.mu.Lock()
 
- 	defer acbw.mu.Unlock()
 
- 	acbw.ac.connect()
 
- }
 
- func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
 
- 	acbw.mu.Lock()
 
- 	defer acbw.mu.Unlock()
 
- 	return acbw.ac
 
- }
 
 
  |