123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- // Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
- package client
- import (
- "context"
- "crypto/tls"
- "encoding/json"
- "errors"
- "fmt"
- "net/http"
- "net/url"
- "slices"
- "sync"
- "time"
- "github.com/syncthing/syncthing/lib/osutil"
- "github.com/syncthing/syncthing/lib/rand"
- "github.com/syncthing/syncthing/lib/relay/protocol"
- )
- type dynamicClient struct {
- commonClient
- pooladdr *url.URL
- certs []tls.Certificate
- timeout time.Duration
- mut sync.RWMutex // Protects client.
- client *staticClient
- }
- func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) *dynamicClient {
- c := &dynamicClient{
- pooladdr: uri,
- certs: certs,
- timeout: timeout,
- }
- c.commonClient = newCommonClient(invitations, c.serve, fmt.Sprintf("dynamicClient@%p", c))
- return c
- }
- func (c *dynamicClient) serve(ctx context.Context) error {
- uri := *c.pooladdr
- // Trim off the `dynamic+` prefix
- uri.Scheme = uri.Scheme[8:]
- l.Debugln(c, "looking up dynamic relays")
- req, err := http.NewRequestWithContext(ctx, "GET", uri.String(), nil)
- if err != nil {
- l.Debugln(c, "failed to lookup dynamic relays", err)
- return err
- }
- data, err := http.DefaultClient.Do(req)
- if err != nil {
- l.Debugln(c, "failed to lookup dynamic relays", err)
- return err
- }
- var ann dynamicAnnouncement
- err = json.NewDecoder(data.Body).Decode(&ann)
- data.Body.Close()
- if err != nil {
- l.Debugln(c, "failed to lookup dynamic relays", err)
- return err
- }
- var addrs []string
- for _, relayAnn := range ann.Relays {
- ruri, err := url.Parse(relayAnn.URL)
- if err != nil {
- l.Debugln(c, "failed to parse dynamic relay address", relayAnn.URL, err)
- continue
- }
- l.Debugln(c, "found", ruri)
- addrs = append(addrs, ruri.String())
- }
- for _, addr := range relayAddressesOrder(ctx, addrs) {
- select {
- case <-ctx.Done():
- l.Debugln(c, "stopping")
- return nil
- default:
- ruri, err := url.Parse(addr)
- if err != nil {
- l.Debugln(c, "skipping relay", addr, err)
- continue
- }
- client := newStaticClient(ruri, c.certs, c.invitations, c.timeout)
- c.mut.Lock()
- c.client = client
- c.mut.Unlock()
- err = c.client.Serve(ctx)
- l.Debugf("Disconnected from %s://%s: %v", c.client.URI().Scheme, c.client.URI().Host, err)
- c.mut.Lock()
- c.client = nil
- c.mut.Unlock()
- }
- }
- l.Debugln(c, "could not find a connectable relay")
- return errors.New("could not find a connectable relay")
- }
- func (c *dynamicClient) Error() error {
- c.mut.RLock()
- defer c.mut.RUnlock()
- if c.client == nil {
- return c.commonClient.Error()
- }
- return c.client.Error()
- }
- func (c *dynamicClient) String() string {
- return fmt.Sprintf("DynamicClient:%p:%s@%s", c, c.URI(), c.pooladdr)
- }
- func (c *dynamicClient) URI() *url.URL {
- c.mut.RLock()
- defer c.mut.RUnlock()
- if c.client == nil {
- return nil
- }
- return c.client.URI()
- }
- // This is the announcement received from the relay server;
- // {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
- type dynamicAnnouncement struct {
- Relays []struct {
- URL string
- }
- }
- // relayAddressesOrder checks the latency to each relay, rounds latency down to
- // the closest 50ms, and puts them in buckets of 50ms latency ranges. Then
- // shuffles each bucket, and returns all addresses starting with the ones from
- // the lowest latency bucket, ending with the highest latency bucket.
- func relayAddressesOrder(ctx context.Context, input []string) []string {
- buckets := make(map[int][]string)
- for _, relay := range input {
- latency, err := osutil.GetLatencyForURL(ctx, relay)
- if err != nil {
- latency = time.Hour
- }
- id := int(latency/time.Millisecond) / 50
- buckets[id] = append(buckets[id], relay)
- select {
- case <-ctx.Done():
- return nil
- default:
- }
- }
- var ids []int
- for id, bucket := range buckets {
- rand.Shuffle(bucket)
- ids = append(ids, id)
- }
- slices.Sort(ids)
- addresses := make([]string, 0, len(input))
- for _, id := range ids {
- addresses = append(addresses, buckets[id]...)
- }
- return addresses
- }
|