|
|
@@ -18,12 +18,16 @@ import (
|
|
|
"net"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
+ "os"
|
|
|
"path/filepath"
|
|
|
+ "strconv"
|
|
|
"strings"
|
|
|
"time"
|
|
|
|
|
|
"github.com/golang/groupcache/lru"
|
|
|
"github.com/oschwald/geoip2-golang"
|
|
|
+ "github.com/prometheus/client_golang/prometheus"
|
|
|
+ "github.com/prometheus/client_golang/prometheus/promhttp"
|
|
|
"github.com/syncthing/syncthing/cmd/strelaypoolsrv/auto"
|
|
|
"github.com/syncthing/syncthing/lib/relay/client"
|
|
|
"github.com/syncthing/syncthing/lib/sync"
|
|
|
@@ -34,12 +38,42 @@ import (
|
|
|
type location struct {
|
|
|
Latitude float64 `json:"latitude"`
|
|
|
Longitude float64 `json:"longitude"`
|
|
|
+ City string `json:"city"`
|
|
|
+ Country string `json:"country"`
|
|
|
+ Continent string `json:"continent"`
|
|
|
}
|
|
|
|
|
|
type relay struct {
|
|
|
- URL string `json:"url"`
|
|
|
- Location location `json:"location"`
|
|
|
- uri *url.URL
|
|
|
+ URL string `json:"url"`
|
|
|
+ Location location `json:"location"`
|
|
|
+ uri *url.URL
|
|
|
+ Stats *stats `json:"stats"`
|
|
|
+ StatsRetrieved time.Time `json:"statsRetrieved"`
|
|
|
+}
|
|
|
+
|
|
|
+type stats struct {
|
|
|
+ StartTime time.Time `json:"startTime"`
|
|
|
+ UptimeSeconds int `json:"uptimeSeconds"`
|
|
|
+ PendingSessionKeys int `json:"numPendingSessionKeys"`
|
|
|
+ ActiveSessions int `json:"numActiveSessions"`
|
|
|
+ Connections int `json:"numConnections"`
|
|
|
+ Proxies int `json:"numProxies"`
|
|
|
+ BytesProxied int `json:"bytesProxied"`
|
|
|
+ GoVersion string `json:"goVersion"`
|
|
|
+ GoOS string `json:"goOS"`
|
|
|
+ GoArch string `json:"goArch"`
|
|
|
+ GoMaxProcs int `json:"goMaxProcs"`
|
|
|
+ GoRoutines int `json:"goNumRoutine"`
|
|
|
+ Rates []int64 `json:"kbps10s1m5m15m30m60m"`
|
|
|
+ Options struct {
|
|
|
+ NetworkTimeout int `json:"network-timeout"`
|
|
|
+ PintInterval int `json:"ping-interval"`
|
|
|
+ MessageTimeout int `json:"message-timeout"`
|
|
|
+ SessionRate int `json:"per-session-rate"`
|
|
|
+ GlobalRate int `json:"global-rate"`
|
|
|
+ Pools []string `json:"pools"`
|
|
|
+ ProvidedBy string `json:"provided-by"`
|
|
|
+ } `json:"options"`
|
|
|
}
|
|
|
|
|
|
func (r relay) String() string {
|
|
|
@@ -47,9 +81,9 @@ func (r relay) String() string {
|
|
|
}
|
|
|
|
|
|
type request struct {
|
|
|
- relay relay
|
|
|
- uri *url.URL
|
|
|
- result chan result
|
|
|
+ relay *relay
|
|
|
+ result chan result
|
|
|
+ queueTimer *prometheus.Timer
|
|
|
}
|
|
|
|
|
|
type result struct {
|
|
|
@@ -58,23 +92,25 @@ type result struct {
|
|
|
}
|
|
|
|
|
|
var (
|
|
|
- testCert tls.Certificate
|
|
|
- listen = ":80"
|
|
|
- dir string
|
|
|
- evictionTime = time.Hour
|
|
|
- debug bool
|
|
|
- getLRUSize = 10 << 10
|
|
|
- getLimitBurst = 10
|
|
|
- getLimitAvg = 1
|
|
|
- postLRUSize = 1 << 10
|
|
|
- postLimitBurst = 2
|
|
|
- postLimitAvg = 1
|
|
|
- getLimit time.Duration
|
|
|
- postLimit time.Duration
|
|
|
- permRelaysFile string
|
|
|
- ipHeader string
|
|
|
- geoipPath string
|
|
|
- proto string
|
|
|
+ testCert tls.Certificate
|
|
|
+ knownRelaysFile = filepath.Join(os.TempDir(), "strelaypoolsrv_known_relays")
|
|
|
+ listen = ":80"
|
|
|
+ dir string
|
|
|
+ evictionTime = time.Hour
|
|
|
+ debug bool
|
|
|
+ getLRUSize = 10 << 10
|
|
|
+ getLimitBurst = 10
|
|
|
+ getLimitAvg = 2
|
|
|
+ postLRUSize = 1 << 10
|
|
|
+ postLimitBurst = 2
|
|
|
+ postLimitAvg = 2
|
|
|
+ getLimit time.Duration
|
|
|
+ postLimit time.Duration
|
|
|
+ permRelaysFile string
|
|
|
+ ipHeader string
|
|
|
+ geoipPath string
|
|
|
+ proto string
|
|
|
+ statsRefresh = time.Minute / 2
|
|
|
|
|
|
getMut = sync.NewRWMutex()
|
|
|
getLRUCache *lru.Cache
|
|
|
@@ -85,8 +121,8 @@ var (
|
|
|
requests = make(chan request, 10)
|
|
|
|
|
|
mut = sync.NewRWMutex()
|
|
|
- knownRelays = make([]relay, 0)
|
|
|
- permanentRelays = make([]relay, 0)
|
|
|
+ knownRelays = make([]*relay, 0)
|
|
|
+ permanentRelays = make([]*relay, 0)
|
|
|
evictionTimers = make(map[string]*time.Timer)
|
|
|
)
|
|
|
|
|
|
@@ -100,15 +136,16 @@ func main() {
|
|
|
flag.BoolVar(&debug, "debug", debug, "Enable debug output")
|
|
|
flag.DurationVar(&evictionTime, "eviction", evictionTime, "After how long the relay is evicted")
|
|
|
flag.IntVar(&getLRUSize, "get-limit-cache", getLRUSize, "Get request limiter cache size")
|
|
|
- flag.IntVar(&getLimitAvg, "get-limit-avg", 2, "Allowed average get request rate, per 10 s")
|
|
|
+ flag.IntVar(&getLimitAvg, "get-limit-avg", getLimitAvg, "Allowed average get request rate, per 10 s")
|
|
|
flag.IntVar(&getLimitBurst, "get-limit-burst", getLimitBurst, "Allowed burst get requests")
|
|
|
flag.IntVar(&postLRUSize, "post-limit-cache", postLRUSize, "Post request limiter cache size")
|
|
|
- flag.IntVar(&postLimitAvg, "post-limit-avg", 2, "Allowed average post request rate, per minute")
|
|
|
+ flag.IntVar(&postLimitAvg, "post-limit-avg", postLimitAvg, "Allowed average post request rate, per minute")
|
|
|
flag.IntVar(&postLimitBurst, "post-limit-burst", postLimitBurst, "Allowed burst post requests")
|
|
|
flag.StringVar(&permRelaysFile, "perm-relays", "", "Path to list of permanent relays")
|
|
|
flag.StringVar(&ipHeader, "ip-header", "", "Name of header which holds clients ip:port. Only meaningful when running behind a reverse proxy.")
|
|
|
flag.StringVar(&geoipPath, "geoip", "GeoLite2-City.mmdb", "Path to GeoLite2-City database")
|
|
|
flag.StringVar(&proto, "protocol", "tcp", "Protocol used for listening. 'tcp' for IPv4 and IPv6, 'tcp4' for IPv4, 'tcp6' for IPv6")
|
|
|
+ flag.DurationVar(&statsRefresh, "stats-refresh", statsRefresh, "Interval at which to refresh relay stats")
|
|
|
|
|
|
flag.Parse()
|
|
|
|
|
|
@@ -122,13 +159,31 @@ func main() {
|
|
|
var err error
|
|
|
|
|
|
if permRelaysFile != "" {
|
|
|
- loadPermanentRelays(permRelaysFile)
|
|
|
+ permanentRelays = loadRelays(permRelaysFile)
|
|
|
}
|
|
|
|
|
|
testCert = createTestCertificate()
|
|
|
|
|
|
go requestProcessor()
|
|
|
|
|
|
+ // Load relays from cache in the background.
|
|
|
+ // Load them in a serial fashion to make sure any genuine requests
|
|
|
+ // are not dropped.
|
|
|
+ go func() {
|
|
|
+ for _, relay := range loadRelays(knownRelaysFile) {
|
|
|
+ resultChan := make(chan result)
|
|
|
+ requests <- request{relay, resultChan, nil}
|
|
|
+ result := <-resultChan
|
|
|
+ if result.err != nil {
|
|
|
+ relayTestsTotal.WithLabelValues("failed").Inc()
|
|
|
+ } else {
|
|
|
+ relayTestsTotal.WithLabelValues("success").Inc()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Run the the stats refresher once the relays are loaded.
|
|
|
+ statsRefresher(statsRefresh)
|
|
|
+ }()
|
|
|
+
|
|
|
if dir != "" {
|
|
|
if debug {
|
|
|
log.Println("Starting TLS listener on", listen)
|
|
|
@@ -173,6 +228,7 @@ func main() {
|
|
|
handler := http.NewServeMux()
|
|
|
handler.HandleFunc("/", handleAssets)
|
|
|
handler.HandleFunc("/endpoint", handleRequest)
|
|
|
+ handler.HandleFunc("/metrics", handleMetrics)
|
|
|
|
|
|
srv := http.Server{
|
|
|
Handler: handler,
|
|
|
@@ -185,6 +241,15 @@ func main() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func handleMetrics(w http.ResponseWriter, r *http.Request) {
|
|
|
+ timer := prometheus.NewTimer(metricsRequestsSeconds)
|
|
|
+ // Acquire the mutex just to make sure we're not caught mid-way stats collection
|
|
|
+ mut.RLock()
|
|
|
+ promhttp.Handler().ServeHTTP(w, r)
|
|
|
+ mut.RUnlock()
|
|
|
+ timer.ObserveDuration()
|
|
|
+}
|
|
|
+
|
|
|
func handleAssets(w http.ResponseWriter, r *http.Request) {
|
|
|
assets := auto.Assets()
|
|
|
path := r.URL.Path[1:]
|
|
|
@@ -245,6 +310,15 @@ func mimeTypeForFile(file string) string {
|
|
|
}
|
|
|
|
|
|
func handleRequest(w http.ResponseWriter, r *http.Request) {
|
|
|
+ timer := prometheus.NewTimer(apiRequestsSeconds.WithLabelValues(r.Method))
|
|
|
+
|
|
|
+ lw := NewLoggingResponseWriter(w)
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ timer.ObserveDuration()
|
|
|
+ apiRequestsTotal.WithLabelValues(r.Method, strconv.Itoa(lw.statusCode)).Inc()
|
|
|
+ }()
|
|
|
+
|
|
|
if ipHeader != "" {
|
|
|
r.RemoteAddr = r.Header.Get(ipHeader)
|
|
|
}
|
|
|
@@ -252,13 +326,13 @@ func handleRequest(w http.ResponseWriter, r *http.Request) {
|
|
|
switch r.Method {
|
|
|
case "GET":
|
|
|
if limit(r.RemoteAddr, getLRUCache, getMut, getLimit, getLimitBurst) {
|
|
|
- w.WriteHeader(429)
|
|
|
+ w.WriteHeader(httpStatusEnhanceYourCalm)
|
|
|
return
|
|
|
}
|
|
|
handleGetRequest(w, r)
|
|
|
case "POST":
|
|
|
if limit(r.RemoteAddr, postLRUCache, postMut, postLimit, postLimitBurst) {
|
|
|
- w.WriteHeader(429)
|
|
|
+ w.WriteHeader(httpStatusEnhanceYourCalm)
|
|
|
return
|
|
|
}
|
|
|
handlePostRequest(w, r)
|
|
|
@@ -282,7 +356,7 @@ func handleGetRequest(w http.ResponseWriter, r *http.Request) {
|
|
|
relays[i], relays[j] = relays[j], relays[i]
|
|
|
}
|
|
|
|
|
|
- json.NewEncoder(w).Encode(map[string][]relay{
|
|
|
+ json.NewEncoder(w).Encode(map[string][]*relay{
|
|
|
"relays": relays,
|
|
|
})
|
|
|
}
|
|
|
@@ -333,11 +407,11 @@ func handlePostRequest(w http.ResponseWriter, r *http.Request) {
|
|
|
if debug {
|
|
|
log.Println("IP address advertised does not match client IP address", r.RemoteAddr, uri)
|
|
|
}
|
|
|
- http.Error(w, "IP address does not match client IP", http.StatusUnauthorized)
|
|
|
+ http.Error(w, fmt.Sprintf("IP advertised %s does not match client IP %s", host, rhost), http.StatusUnauthorized)
|
|
|
return
|
|
|
}
|
|
|
+
|
|
|
newRelay.uri = uri
|
|
|
- newRelay.Location = getLocation(uri.Host)
|
|
|
|
|
|
for _, current := range permanentRelays {
|
|
|
if current.uri.Host == newRelay.uri.Host {
|
|
|
@@ -352,18 +426,21 @@ func handlePostRequest(w http.ResponseWriter, r *http.Request) {
|
|
|
reschan := make(chan result)
|
|
|
|
|
|
select {
|
|
|
- case requests <- request{newRelay, uri, reschan}:
|
|
|
+ case requests <- request{&newRelay, reschan, prometheus.NewTimer(relayTestActionsSeconds.WithLabelValues("queue"))}:
|
|
|
result := <-reschan
|
|
|
if result.err != nil {
|
|
|
+ relayTestsTotal.WithLabelValues("failed").Inc()
|
|
|
http.Error(w, result.err.Error(), http.StatusBadRequest)
|
|
|
return
|
|
|
}
|
|
|
+ relayTestsTotal.WithLabelValues("success").Inc()
|
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
|
json.NewEncoder(w).Encode(map[string]time.Duration{
|
|
|
"evictionIn": result.eviction,
|
|
|
})
|
|
|
|
|
|
default:
|
|
|
+ relayTestsTotal.WithLabelValues("dropped").Inc()
|
|
|
if debug {
|
|
|
log.Println("Dropping request")
|
|
|
}
|
|
|
@@ -373,57 +450,81 @@ func handlePostRequest(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
|
func requestProcessor() {
|
|
|
for request := range requests {
|
|
|
+ if request.queueTimer != nil {
|
|
|
+ request.queueTimer.ObserveDuration()
|
|
|
+ }
|
|
|
+
|
|
|
+ timer := prometheus.NewTimer(relayTestActionsSeconds.WithLabelValues("test"))
|
|
|
+ handleRelayTest(request)
|
|
|
+ timer.ObserveDuration()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func handleRelayTest(request request) {
|
|
|
+ if debug {
|
|
|
+ log.Println("Request for", request.relay)
|
|
|
+ }
|
|
|
+ if !client.TestRelay(request.relay.uri, []tls.Certificate{testCert}, time.Second, 2*time.Second, 3) {
|
|
|
if debug {
|
|
|
- log.Println("Request for", request.relay)
|
|
|
+ log.Println("Test for relay", request.relay, "failed")
|
|
|
}
|
|
|
- if !client.TestRelay(request.uri, []tls.Certificate{testCert}, time.Second, 2*time.Second, 3) {
|
|
|
- if debug {
|
|
|
- log.Println("Test for relay", request.relay, "failed")
|
|
|
- }
|
|
|
- request.result <- result{fmt.Errorf("connection test failed"), 0}
|
|
|
- continue
|
|
|
+ request.result <- result{fmt.Errorf("connection test failed"), 0}
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ stats := fetchStats(request.relay)
|
|
|
+ location := getLocation(request.relay.uri.Host)
|
|
|
+
|
|
|
+ mut.Lock()
|
|
|
+ if stats != nil {
|
|
|
+ updateMetrics(request.relay.uri.Host, stats, location)
|
|
|
+ }
|
|
|
+ request.relay.Stats = stats
|
|
|
+ request.relay.StatsRetrieved = time.Now()
|
|
|
+ request.relay.Location = location
|
|
|
+
|
|
|
+ timer, ok := evictionTimers[request.relay.uri.Host]
|
|
|
+ if ok {
|
|
|
+ if debug {
|
|
|
+ log.Println("Stopping existing timer for", request.relay)
|
|
|
}
|
|
|
+ timer.Stop()
|
|
|
+ }
|
|
|
|
|
|
- mut.Lock()
|
|
|
- timer, ok := evictionTimers[request.relay.uri.Host]
|
|
|
- if ok {
|
|
|
+ for i, current := range knownRelays {
|
|
|
+ if current.uri.Host == request.relay.uri.Host {
|
|
|
if debug {
|
|
|
- log.Println("Stopping existing timer for", request.relay)
|
|
|
+ log.Println("Relay", request.relay, "already exists")
|
|
|
}
|
|
|
- timer.Stop()
|
|
|
- }
|
|
|
|
|
|
- for i, current := range knownRelays {
|
|
|
- if current.uri.Host == request.relay.uri.Host {
|
|
|
- if debug {
|
|
|
- log.Println("Relay", request.relay, "already exists")
|
|
|
- }
|
|
|
+ // Evict the old entry anyway, as configuration might have changed.
|
|
|
+ last := len(knownRelays) - 1
|
|
|
+ knownRelays[i] = knownRelays[last]
|
|
|
+ knownRelays = knownRelays[:last]
|
|
|
|
|
|
- // Evict the old entry anyway, as configuration might have changed.
|
|
|
- last := len(knownRelays) - 1
|
|
|
- knownRelays[i] = knownRelays[last]
|
|
|
- knownRelays = knownRelays[:last]
|
|
|
-
|
|
|
- goto found
|
|
|
- }
|
|
|
+ goto found
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if debug {
|
|
|
- log.Println("Adding new relay", request.relay)
|
|
|
- }
|
|
|
+ if debug {
|
|
|
+ log.Println("Adding new relay", request.relay)
|
|
|
+ }
|
|
|
+
|
|
|
+found:
|
|
|
|
|
|
- found:
|
|
|
+ knownRelays = append(knownRelays, request.relay)
|
|
|
+ evictionTimers[request.relay.uri.Host] = time.AfterFunc(evictionTime, evict(request.relay))
|
|
|
|
|
|
- knownRelays = append(knownRelays, request.relay)
|
|
|
+ mut.Unlock()
|
|
|
|
|
|
- evictionTimers[request.relay.uri.Host] = time.AfterFunc(evictionTime, evict(request.relay))
|
|
|
- mut.Unlock()
|
|
|
- request.result <- result{nil, evictionTime}
|
|
|
+ if err := saveRelays(knownRelaysFile, knownRelays); err != nil {
|
|
|
+ log.Println("Failed to write known relays: " + err.Error())
|
|
|
}
|
|
|
|
|
|
+ request.result <- result{nil, evictionTime}
|
|
|
}
|
|
|
|
|
|
-func evict(relay relay) func() {
|
|
|
+func evict(relay *relay) func() {
|
|
|
return func() {
|
|
|
mut.Lock()
|
|
|
defer mut.Unlock()
|
|
|
@@ -438,6 +539,7 @@ func evict(relay relay) func() {
|
|
|
last := len(knownRelays) - 1
|
|
|
knownRelays[i] = knownRelays[last]
|
|
|
knownRelays = knownRelays[:last]
|
|
|
+ deleteMetrics(current.uri.Host)
|
|
|
}
|
|
|
}
|
|
|
delete(evictionTimers, relay.uri.Host)
|
|
|
@@ -466,12 +568,14 @@ func limit(addr string, cache *lru.Cache, lock sync.RWMutex, intv time.Duration,
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
-func loadPermanentRelays(file string) {
|
|
|
+func loadRelays(file string) []*relay {
|
|
|
content, err := ioutil.ReadFile(file)
|
|
|
if err != nil {
|
|
|
- log.Fatal(err)
|
|
|
+ log.Println("Failed to load relays: " + err.Error())
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
+ var relays []*relay
|
|
|
for _, line := range strings.Split(string(content), "\n") {
|
|
|
if len(line) == 0 {
|
|
|
continue
|
|
|
@@ -480,21 +584,30 @@ func loadPermanentRelays(file string) {
|
|
|
uri, err := url.Parse(line)
|
|
|
if err != nil {
|
|
|
if debug {
|
|
|
- log.Println("Skipping permanent relay", line, "due to parse error", err)
|
|
|
+ log.Println("Skipping relay", line, "due to parse error", err)
|
|
|
}
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
- permanentRelays = append(permanentRelays, relay{
|
|
|
+ relays = append(relays, &relay{
|
|
|
URL: line,
|
|
|
Location: getLocation(uri.Host),
|
|
|
uri: uri,
|
|
|
})
|
|
|
if debug {
|
|
|
- log.Println("Adding permanent relay", line)
|
|
|
+ log.Println("Adding relay", line)
|
|
|
}
|
|
|
}
|
|
|
+ return relays
|
|
|
+}
|
|
|
+
|
|
|
+func saveRelays(file string, relays []*relay) error {
|
|
|
+ var content string
|
|
|
+ for _, relay := range relays {
|
|
|
+ content += relay.uri.String() + "\n"
|
|
|
+ }
|
|
|
+ return ioutil.WriteFile(file, []byte(content), 0777)
|
|
|
}
|
|
|
|
|
|
func createTestCertificate() tls.Certificate {
|
|
|
@@ -513,6 +626,8 @@ func createTestCertificate() tls.Certificate {
|
|
|
}
|
|
|
|
|
|
func getLocation(host string) location {
|
|
|
+ timer := prometheus.NewTimer(locationLookupSeconds)
|
|
|
+ defer timer.ObserveDuration()
|
|
|
db, err := geoip2.Open(geoipPath)
|
|
|
if err != nil {
|
|
|
return location{}
|
|
|
@@ -530,7 +645,24 @@ func getLocation(host string) location {
|
|
|
}
|
|
|
|
|
|
return location{
|
|
|
- Latitude: city.Location.Latitude,
|
|
|
Longitude: city.Location.Longitude,
|
|
|
+ Latitude: city.Location.Latitude,
|
|
|
+ City: city.City.Names["en"],
|
|
|
+ Country: city.Country.IsoCode,
|
|
|
+ Continent: city.Continent.Code,
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+type loggingResponseWriter struct {
|
|
|
+ http.ResponseWriter
|
|
|
+ statusCode int
|
|
|
+}
|
|
|
+
|
|
|
+func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
|
|
|
+ return &loggingResponseWriter{w, http.StatusOK}
|
|
|
+}
|
|
|
+
|
|
|
+func (lrw *loggingResponseWriter) WriteHeader(code int) {
|
|
|
+ lrw.statusCode = code
|
|
|
+ lrw.ResponseWriter.WriteHeader(code)
|
|
|
+}
|