| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- // Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
- package client
- import (
- "crypto/tls"
- "encoding/json"
- "fmt"
- "math/rand"
- "net/http"
- "net/url"
- "sort"
- "time"
- "github.com/syncthing/syncthing/lib/osutil"
- "github.com/syncthing/syncthing/lib/relay/protocol"
- "github.com/syncthing/syncthing/lib/sync"
- )
- type dynamicClient struct {
- pooladdr *url.URL
- certs []tls.Certificate
- invitations chan protocol.SessionInvitation
- closeInvitationsOnFinish bool
- mut sync.RWMutex
- client RelayClient
- stop chan struct{}
- }
- func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) RelayClient {
- closeInvitationsOnFinish := false
- if invitations == nil {
- closeInvitationsOnFinish = true
- invitations = make(chan protocol.SessionInvitation)
- }
- return &dynamicClient{
- pooladdr: uri,
- certs: certs,
- invitations: invitations,
- closeInvitationsOnFinish: closeInvitationsOnFinish,
- mut: sync.NewRWMutex(),
- }
- }
- func (c *dynamicClient) Serve() {
- c.mut.Lock()
- c.stop = make(chan struct{})
- c.mut.Unlock()
- uri := *c.pooladdr
- // Trim off the `dynamic+` prefix
- uri.Scheme = uri.Scheme[8:]
- l.Debugln(c, "looking up dynamic relays")
- data, err := http.Get(uri.String())
- if err != nil {
- l.Debugln(c, "failed to lookup dynamic relays", err)
- return
- }
- var ann dynamicAnnouncement
- err = json.NewDecoder(data.Body).Decode(&ann)
- data.Body.Close()
- if err != nil {
- l.Debugln(c, "failed to lookup dynamic relays", err)
- return
- }
- defer c.cleanup()
- var addrs []string
- for _, relayAnn := range ann.Relays {
- ruri, err := url.Parse(relayAnn.URL)
- if err != nil {
- l.Debugln(c, "failed to parse dynamic relay address", relayAnn.URL, err)
- continue
- }
- l.Debugln(c, "found", ruri)
- addrs = append(addrs, ruri.String())
- }
- for _, addr := range relayAddressesOrder(addrs) {
- select {
- case <-c.stop:
- l.Debugln(c, "stopping")
- return
- default:
- ruri, err := url.Parse(addr)
- if err != nil {
- l.Debugln(c, "skipping relay", addr, err)
- continue
- }
- client, err := NewClient(ruri, c.certs, c.invitations)
- if err != nil {
- continue
- }
- c.mut.Lock()
- c.client = client
- c.mut.Unlock()
- c.client.Serve()
- c.mut.Lock()
- c.client = nil
- c.mut.Unlock()
- }
- }
- l.Debugln(c, "could not find a connectable relay")
- }
- func (c *dynamicClient) Stop() {
- c.mut.RLock()
- defer c.mut.RUnlock()
- close(c.stop)
- if c.client == nil {
- return
- }
- c.client.Stop()
- }
- func (c *dynamicClient) StatusOK() bool {
- c.mut.RLock()
- defer c.mut.RUnlock()
- if c.client == nil {
- return false
- }
- return c.client.StatusOK()
- }
- func (c *dynamicClient) Latency() time.Duration {
- c.mut.RLock()
- defer c.mut.RUnlock()
- if c.client == nil {
- return time.Hour
- }
- return c.client.Latency()
- }
- func (c *dynamicClient) String() string {
- return fmt.Sprintf("DynamicClient:%p:%s@%s", c, c.URI(), c.pooladdr)
- }
- func (c *dynamicClient) URI() *url.URL {
- c.mut.RLock()
- defer c.mut.RUnlock()
- if c.client == nil {
- return c.pooladdr
- }
- return c.client.URI()
- }
- func (c *dynamicClient) Invitations() chan protocol.SessionInvitation {
- c.mut.RLock()
- inv := c.invitations
- c.mut.RUnlock()
- return inv
- }
- func (c *dynamicClient) cleanup() {
- c.mut.Lock()
- if c.closeInvitationsOnFinish {
- close(c.invitations)
- c.invitations = make(chan protocol.SessionInvitation)
- }
- c.mut.Unlock()
- }
- // This is the announcement recieved from the relay server;
- // {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
- type dynamicAnnouncement struct {
- Relays []struct {
- URL string
- }
- }
- // relayAddressesOrder checks the latency to each relay, rounds latency down to
- // the closest 50ms, and puts them in buckets of 50ms latency ranges. Then
- // shuffles each bucket, and returns all addresses starting with the ones from
- // the lowest latency bucket, ending with the highest latency buceket.
- func relayAddressesOrder(input []string) []string {
- buckets := make(map[int][]string)
- for _, relay := range input {
- latency, err := osutil.GetLatencyForURL(relay)
- if err != nil {
- latency = time.Hour
- }
- id := int(latency/time.Millisecond) / 50
- buckets[id] = append(buckets[id], relay)
- }
- var ids []int
- for id, bucket := range buckets {
- shuffle(bucket)
- ids = append(ids, id)
- }
- sort.Ints(ids)
- addresses := make([]string, len(input))
- for _, id := range ids {
- addresses = append(addresses, buckets[id]...)
- }
- return addresses
- }
- func shuffle(slice []string) {
- for i := len(slice) - 1; i > 0; i-- {
- j := rand.Intn(i + 1)
- slice[i], slice[j] = slice[j], slice[i]
- }
- }
|