|
|
@@ -38,11 +38,8 @@ import (
|
|
|
|
|
|
var (
|
|
|
flagDERPMap = flag.String("derp-map", "https://login.tailscale.com/derpmap/default", "URL to DERP map")
|
|
|
- flagOut = flag.String("out", "", "output sqlite filename")
|
|
|
flagInterval = flag.Duration("interval", time.Minute, "interval to probe at in time.ParseDuration() format")
|
|
|
- flagAPI = flag.String("api", "", "listen addr for HTTP API")
|
|
|
flagIPv6 = flag.Bool("ipv6", false, "probe IPv6 addresses")
|
|
|
- flagRetention = flag.Duration("retention", time.Hour*24*7, "sqlite retention period in time.ParseDuration() format")
|
|
|
flagRemoteWriteURL = flag.String("rw-url", "", "prometheus remote write URL")
|
|
|
flagInstance = flag.String("instance", "", "instance label value; defaults to hostname if unspecified")
|
|
|
flagDstPorts = flag.String("dst-ports", "", "comma-separated list of destination ports to monitor")
|
|
|
@@ -639,15 +636,9 @@ func main() {
|
|
|
if len(*flagDERPMap) < 1 {
|
|
|
log.Fatal("derp-map flag is unset")
|
|
|
}
|
|
|
- if len(*flagOut) < 1 {
|
|
|
- log.Fatal("out flag is unset")
|
|
|
- }
|
|
|
if *flagInterval < minInterval || *flagInterval > maxBufferDuration {
|
|
|
log.Fatalf("interval must be >= %s and <= %s", minInterval, maxBufferDuration)
|
|
|
}
|
|
|
- if *flagRetention < *flagInterval {
|
|
|
- log.Fatal("retention must be >= interval")
|
|
|
- }
|
|
|
if len(*flagRemoteWriteURL) < 1 {
|
|
|
log.Fatal("rw-url flag is unset")
|
|
|
}
|
|
|
@@ -693,49 +684,6 @@ func main() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- db, err := newDB(*flagOut)
|
|
|
- if err != nil {
|
|
|
- log.Fatalf("error opening output file for writing: %v", err)
|
|
|
- }
|
|
|
- defer db.Close()
|
|
|
-
|
|
|
- _, err = db.Exec("PRAGMA journal_mode=WAL")
|
|
|
- if err != nil {
|
|
|
- log.Fatalf("error enabling WAL mode: %v", err)
|
|
|
- }
|
|
|
-
|
|
|
- // No indices or primary key. Keep it simple for now. Reads will be full
|
|
|
- // scans. We can AUTOINCREMENT rowid in the future and hold an in-memory
|
|
|
- // index to at_unix if needed as reads are almost always going to be
|
|
|
- // time-bound (e.g. WHERE at_unix >= ?). At the time of authorship we have
|
|
|
- // ~300 data points per-interval w/o ipv6 w/kernel timestamping resulting
|
|
|
- // in ~2.6m rows in 24h w/a 10s probe interval.
|
|
|
- _, err = db.Exec(`
|
|
|
-CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT, address TEXT, timestamp_source INT, stable_conn INT, dst_port INT, rtt_ns INT)
|
|
|
-`)
|
|
|
- if err != nil {
|
|
|
- log.Fatalf("error initializing db: %v", err)
|
|
|
- }
|
|
|
-
|
|
|
- wg := sync.WaitGroup{}
|
|
|
- httpErrCh := make(chan error, 1)
|
|
|
- var httpServer *http.Server
|
|
|
- if len(*flagAPI) > 0 {
|
|
|
- api := newAPI(db)
|
|
|
- httpServer = &http.Server{
|
|
|
- Addr: *flagAPI,
|
|
|
- Handler: api,
|
|
|
- ReadTimeout: time.Second * 60,
|
|
|
- WriteTimeout: time.Second * 60,
|
|
|
- }
|
|
|
- wg.Add(1)
|
|
|
- go func() {
|
|
|
- err := httpServer.ListenAndServe()
|
|
|
- httpErrCh <- err
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- }
|
|
|
-
|
|
|
tsCh := make(chan []prompb.TimeSeries, maxBufferDuration / *flagInterval)
|
|
|
remoteWriteDoneCh := make(chan struct{})
|
|
|
rwc := newRemoteWriteClient(*flagRemoteWriteURL)
|
|
|
@@ -745,9 +693,6 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
|
|
|
}()
|
|
|
|
|
|
shutdown := func() {
|
|
|
- if httpServer != nil {
|
|
|
- httpServer.Close()
|
|
|
- }
|
|
|
close(tsCh)
|
|
|
select {
|
|
|
case <-time.After(time.Second * 10): // give goroutine some time to flush
|
|
|
@@ -766,7 +711,6 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
|
|
|
cancel()
|
|
|
}
|
|
|
|
|
|
- wg.Wait()
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -787,20 +731,9 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
|
|
|
defer derpMapTicker.Stop()
|
|
|
probeTicker := time.NewTicker(*flagInterval)
|
|
|
defer probeTicker.Stop()
|
|
|
- cleanupTicker := time.NewTicker(time.Hour)
|
|
|
- defer cleanupTicker.Stop()
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
- case <-cleanupTicker.C:
|
|
|
- older := time.Now().Add(-*flagRetention)
|
|
|
- log.Printf("cleaning up measurements older than %v", older)
|
|
|
- _, err := db.Exec("DELETE FROM rtt WHERE at_unix < ?", older.Unix())
|
|
|
- if err != nil {
|
|
|
- log.Printf("error cleaning up old data: %v", err)
|
|
|
- shutdown()
|
|
|
- return
|
|
|
- }
|
|
|
case <-probeTicker.C:
|
|
|
results, err := probeNodes(nodeMetaByAddr, stableConns, dstPorts)
|
|
|
if err != nil {
|
|
|
@@ -819,32 +752,6 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
|
|
|
tsCh <- ts
|
|
|
}
|
|
|
}
|
|
|
- tx, err := db.Begin()
|
|
|
- if err != nil {
|
|
|
- log.Printf("error beginning sqlite tx: %v", err)
|
|
|
- shutdown()
|
|
|
- return
|
|
|
- }
|
|
|
- for _, result := range results {
|
|
|
- af := 4
|
|
|
- if result.key.meta.addr.Is6() {
|
|
|
- af = 6
|
|
|
- }
|
|
|
- _, err = tx.Exec("INSERT INTO rtt(at_unix, region_id, hostname, af, address, timestamp_source, stable_conn, dst_port, rtt_ns) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
|
- result.at.Unix(), result.key.meta.regionID, result.key.meta.hostname, af, result.key.meta.addr.String(), result.key.timestampSource, result.key.connStability, result.key.dstPort, result.rtt)
|
|
|
- if err != nil {
|
|
|
- tx.Rollback()
|
|
|
- log.Printf("error adding result to tx: %v", err)
|
|
|
- shutdown()
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- err = tx.Commit()
|
|
|
- if err != nil {
|
|
|
- log.Printf("error committing tx: %v", err)
|
|
|
- shutdown()
|
|
|
- return
|
|
|
- }
|
|
|
case dm := <-dmCh:
|
|
|
staleMeta, err := nodeMetaFromDERPMap(dm, nodeMetaByAddr, *flagIPv6)
|
|
|
if err != nil {
|
|
|
@@ -874,10 +781,6 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
|
|
|
dmCh <- updatedDM
|
|
|
}
|
|
|
}()
|
|
|
- case err := <-httpErrCh:
|
|
|
- log.Printf("http server error: %v", err)
|
|
|
- shutdown()
|
|
|
- return
|
|
|
case <-sigCh:
|
|
|
shutdown()
|
|
|
return
|