|
|
@@ -6,15 +6,21 @@ package sockstatlog
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
+ "crypto/sha256"
|
|
|
"encoding/json"
|
|
|
"io"
|
|
|
+ "net/http"
|
|
|
"os"
|
|
|
"path/filepath"
|
|
|
"time"
|
|
|
|
|
|
+ "tailscale.com/logpolicy"
|
|
|
+ "tailscale.com/logtail"
|
|
|
"tailscale.com/logtail/filch"
|
|
|
"tailscale.com/net/sockstats"
|
|
|
+ "tailscale.com/smallzstd"
|
|
|
"tailscale.com/types/logger"
|
|
|
+ "tailscale.com/types/logid"
|
|
|
"tailscale.com/util/mak"
|
|
|
)
|
|
|
|
|
|
@@ -26,9 +32,10 @@ type Logger struct {
|
|
|
ctx context.Context
|
|
|
cancelFn context.CancelFunc
|
|
|
|
|
|
- ticker *time.Ticker
|
|
|
- logf logger.Logf
|
|
|
- logbuffer *filch.Filch
|
|
|
+ ticker *time.Ticker
|
|
|
+ logf logger.Logf
|
|
|
+
|
|
|
+ logger *logtail.Logger
|
|
|
}
|
|
|
|
|
|
// deltaStat represents the bytes transferred during a time period.
|
|
|
@@ -50,10 +57,18 @@ type event struct {
|
|
|
Stats map[sockstats.Label]deltaStat `json:"s"`
|
|
|
}
|
|
|
|
|
|
+// SockstatLogID reproducibly derives a new logid.PrivateID for sockstat logging from a node's public backend log ID.
|
|
|
+// The returned PrivateID is the sha256 sum of id + "sockstat".
|
|
|
+// If a node's public log ID becomes known, it is trivial to spoof sockstat logs for that node.
|
|
|
+// Given the this is just for debugging, we're not too concerned about that.
|
|
|
+func SockstatLogID(id string) logid.PrivateID {
|
|
|
+ return logid.PrivateID(sha256.Sum256([]byte(id + "sockstat")))
|
|
|
+}
|
|
|
+
|
|
|
// NewLogger returns a new Logger that will store stats in logdir.
|
|
|
// On platforms that do not support sockstat logging, a nil Logger will be returned.
|
|
|
// The returned Logger must be shut down with Shutdown when it is no longer needed.
|
|
|
-func NewLogger(logdir string, logf logger.Logf) (*Logger, error) {
|
|
|
+func NewLogger(logdir string, logf logger.Logf, backendLogID string) (*Logger, error) {
|
|
|
if !sockstats.IsAvailable {
|
|
|
return nil, nil
|
|
|
}
|
|
|
@@ -69,12 +84,31 @@ func NewLogger(logdir string, logf logger.Logf) (*Logger, error) {
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
logger := &Logger{
|
|
|
- ctx: ctx,
|
|
|
- cancelFn: cancel,
|
|
|
- ticker: time.NewTicker(pollPeriod),
|
|
|
- logf: logf,
|
|
|
- logbuffer: filch,
|
|
|
+ ctx: ctx,
|
|
|
+ cancelFn: cancel,
|
|
|
+ ticker: time.NewTicker(pollPeriod),
|
|
|
+ logf: logf,
|
|
|
}
|
|
|
+ logger.logger = logtail.NewLogger(logtail.Config{
|
|
|
+ BaseURL: logpolicy.LogURL(),
|
|
|
+ PrivateID: SockstatLogID(backendLogID),
|
|
|
+ Collection: "sockstats.log.tailscale.io",
|
|
|
+ Buffer: filch,
|
|
|
+ NewZstdEncoder: func() logtail.Encoder {
|
|
|
+ w, err := smallzstd.NewEncoder(nil)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ return w
|
|
|
+ },
|
|
|
+ FlushDelayFn: func() time.Duration {
|
|
|
+ // set flush delay to 100 years so it never flushes automatically
|
|
|
+ return 100 * 365 * 24 * time.Hour
|
|
|
+ },
|
|
|
+ Stderr: io.Discard, // don't log to stderr
|
|
|
+
|
|
|
+ HTTPC: &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost)},
|
|
|
+ }, logf)
|
|
|
|
|
|
go logger.poll()
|
|
|
|
|
|
@@ -89,7 +123,7 @@ func (l *Logger) poll() {
|
|
|
var lastStats *sockstats.SockStats
|
|
|
var lastTime time.Time
|
|
|
|
|
|
- enc := json.NewEncoder(l.logbuffer)
|
|
|
+ enc := json.NewEncoder(l.logger)
|
|
|
for {
|
|
|
select {
|
|
|
case <-l.ctx.Done():
|
|
|
@@ -118,31 +152,22 @@ func (l *Logger) poll() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (l *Logger) Shutdown() {
|
|
|
- l.ticker.Stop()
|
|
|
- l.logbuffer.Close()
|
|
|
- l.cancelFn()
|
|
|
+func (l *Logger) LogID() string {
|
|
|
+ if l.logger == nil {
|
|
|
+ return ""
|
|
|
+ }
|
|
|
+ return l.logger.PrivateID().Public().String()
|
|
|
}
|
|
|
|
|
|
-// WriteLogs reads local logs, combining logs into events, and writes them to w.
|
|
|
-// Logs within eventWindow are combined into the same event.
|
|
|
-func (l *Logger) WriteLogs(w io.Writer) {
|
|
|
- if l == nil || l.logbuffer == nil {
|
|
|
- return
|
|
|
- }
|
|
|
- for {
|
|
|
- b, err := l.logbuffer.TryReadLine()
|
|
|
- if err != nil {
|
|
|
- l.logf("sockstatlog: error reading log: %v", err)
|
|
|
- return
|
|
|
- }
|
|
|
- if b == nil {
|
|
|
- // no more log messages
|
|
|
- return
|
|
|
- }
|
|
|
+// Flush sends pending logs to the log server and flushes them from the local buffer.
|
|
|
+func (l *Logger) Flush() {
|
|
|
+ l.logger.StartFlush()
|
|
|
+}
|
|
|
|
|
|
- w.Write(b)
|
|
|
- }
|
|
|
+func (l *Logger) Shutdown() {
|
|
|
+ l.ticker.Stop()
|
|
|
+ l.logger.Shutdown(context.Background())
|
|
|
+ l.cancelFn()
|
|
|
}
|
|
|
|
|
|
// delta calculates the delta stats between two SockStats snapshots.
|