| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413 |
- package networkquality
- import (
- "context"
- "crypto/tls"
- "encoding/json"
- "io"
- "math"
- "math/rand"
- "net/http"
- "net/http/httptrace"
- "net/url"
- "sort"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- sBufio "github.com/sagernet/sing/common/bufio"
- E "github.com/sagernet/sing/common/exceptions"
- N "github.com/sagernet/sing/common/network"
- )
- const DefaultConfigURL = "https://mensura.cdn-apple.com/api/v1/gm/config"
- type Config struct {
- Version int `json:"version"`
- TestEndpoint string `json:"test_endpoint"`
- URLs URLs `json:"urls"`
- }
- type URLs struct {
- SmallHTTPSDownloadURL string `json:"small_https_download_url"`
- LargeHTTPSDownloadURL string `json:"large_https_download_url"`
- HTTPSUploadURL string `json:"https_upload_url"`
- SmallDownloadURL string `json:"small_download_url"`
- LargeDownloadURL string `json:"large_download_url"`
- UploadURL string `json:"upload_url"`
- }
- func (u *URLs) smallDownloadURL() string {
- if u.SmallHTTPSDownloadURL != "" {
- return u.SmallHTTPSDownloadURL
- }
- return u.SmallDownloadURL
- }
- func (u *URLs) largeDownloadURL() string {
- if u.LargeHTTPSDownloadURL != "" {
- return u.LargeHTTPSDownloadURL
- }
- return u.LargeDownloadURL
- }
- func (u *URLs) uploadURL() string {
- if u.HTTPSUploadURL != "" {
- return u.HTTPSUploadURL
- }
- return u.UploadURL
- }
- type Accuracy int32
- const (
- AccuracyLow Accuracy = 0
- AccuracyMedium Accuracy = 1
- AccuracyHigh Accuracy = 2
- )
- func (a Accuracy) String() string {
- switch a {
- case AccuracyHigh:
- return "High"
- case AccuracyMedium:
- return "Medium"
- default:
- return "Low"
- }
- }
- type Result struct {
- DownloadCapacity int64
- UploadCapacity int64
- DownloadRPM int32
- UploadRPM int32
- IdleLatencyMs int32
- DownloadCapacityAccuracy Accuracy
- UploadCapacityAccuracy Accuracy
- DownloadRPMAccuracy Accuracy
- UploadRPMAccuracy Accuracy
- }
- type Progress struct {
- Phase Phase
- DownloadCapacity int64
- UploadCapacity int64
- DownloadRPM int32
- UploadRPM int32
- IdleLatencyMs int32
- ElapsedMs int64
- DownloadCapacityAccuracy Accuracy
- UploadCapacityAccuracy Accuracy
- DownloadRPMAccuracy Accuracy
- UploadRPMAccuracy Accuracy
- }
- type Phase int32
- const (
- PhaseIdle Phase = 0
- PhaseDownload Phase = 1
- PhaseUpload Phase = 2
- PhaseDone Phase = 3
- )
- type Options struct {
- ConfigURL string
- HTTPClient *http.Client
- NewMeasurementClient MeasurementClientFactory
- Serial bool
- MaxRuntime time.Duration
- OnProgress func(Progress)
- Context context.Context
- }
- const DefaultMaxRuntime = 20 * time.Second
- type measurementSettings struct {
- idleProbeCount int
- testTimeout time.Duration
- stabilityInterval time.Duration
- sampleInterval time.Duration
- progressInterval time.Duration
- maxProbesPerSecond int
- initialConnections int
- maxConnections int
- movingAvgDistance int
- trimPercent int
- stdDevTolerancePct float64
- maxProbeCapacityPct float64
- }
- var settings = measurementSettings{
- idleProbeCount: 5,
- testTimeout: DefaultMaxRuntime,
- stabilityInterval: time.Second,
- sampleInterval: 250 * time.Millisecond,
- progressInterval: 500 * time.Millisecond,
- maxProbesPerSecond: 100,
- initialConnections: 1,
- maxConnections: 16,
- movingAvgDistance: 4,
- trimPercent: 5,
- stdDevTolerancePct: 5,
- maxProbeCapacityPct: 0.05,
- }
- type resolvedConfig struct {
- smallURL *url.URL
- largeURL *url.URL
- uploadURL *url.URL
- connectEndpoint string
- }
- type directionPlan struct {
- dataURL *url.URL
- probeURL *url.URL
- connectEndpoint string
- isUpload bool
- }
- type probeTrace struct {
- reused bool
- connectStart time.Time
- connectDone time.Time
- tlsStart time.Time
- tlsDone time.Time
- tlsVersion uint16
- gotConn time.Time
- wroteRequest time.Time
- firstResponseByte time.Time
- }
- type probeMeasurement struct {
- total time.Duration
- tcp time.Duration
- tls time.Duration
- httpFirst time.Duration
- httpLoaded time.Duration
- bytes int64
- reused bool
- }
- type probeRound struct {
- interval int
- tcp time.Duration
- tls time.Duration
- httpFirst time.Duration
- httpLoaded time.Duration
- }
- func (p probeRound) responsivenessLatency() float64 {
- var foreignSamples []float64
- if p.tcp > 0 {
- foreignSamples = append(foreignSamples, durationMillis(p.tcp))
- }
- if p.tls > 0 {
- foreignSamples = append(foreignSamples, durationMillis(p.tls))
- }
- if p.httpFirst > 0 {
- foreignSamples = append(foreignSamples, durationMillis(p.httpFirst))
- }
- if len(foreignSamples) == 0 || p.httpLoaded <= 0 {
- return 0
- }
- return (meanFloat64s(foreignSamples) + durationMillis(p.httpLoaded)) / 2
- }
- const maxConsecutiveErrors = 3
- type loadConnection struct {
- client *http.Client
- dataURL *url.URL
- isUpload bool
- active atomic.Bool
- ready atomic.Bool
- }
- func (c *loadConnection) run(ctx context.Context, onError func(error)) {
- defer c.client.CloseIdleConnections()
- markActive := func() {
- c.ready.Store(true)
- c.active.Store(true)
- }
- var consecutiveErrors int
- for {
- select {
- case <-ctx.Done():
- return
- default:
- }
- var err error
- if c.isUpload {
- err = runUploadRequest(ctx, c.client, c.dataURL.String(), markActive)
- } else {
- err = runDownloadRequest(ctx, c.client, c.dataURL.String(), markActive)
- }
- c.active.Store(false)
- if err != nil {
- if ctx.Err() != nil {
- return
- }
- consecutiveErrors++
- if consecutiveErrors > maxConsecutiveErrors {
- onError(err)
- return
- }
- c.client.CloseIdleConnections()
- continue
- }
- consecutiveErrors = 0
- }
- }
- type intervalThroughput struct {
- interval int
- bps float64
- }
- type intervalWindow struct {
- lower int
- upper int
- }
- type stabilityTracker struct {
- window int
- stdDevTolerancePct float64
- instantaneous []float64
- movingAverages []float64
- }
- func (s *stabilityTracker) add(value float64) bool {
- if value <= 0 || math.IsNaN(value) || math.IsInf(value, 0) {
- return false
- }
- s.instantaneous = append(s.instantaneous, value)
- if len(s.instantaneous) > s.window {
- s.instantaneous = s.instantaneous[len(s.instantaneous)-s.window:]
- }
- s.movingAverages = append(s.movingAverages, meanFloat64s(s.instantaneous))
- if len(s.movingAverages) > s.window {
- s.movingAverages = s.movingAverages[len(s.movingAverages)-s.window:]
- }
- return s.stable()
- }
- func (s *stabilityTracker) ready() bool {
- return len(s.movingAverages) >= s.window
- }
- func (s *stabilityTracker) accuracy() Accuracy {
- if s.stable() {
- return AccuracyHigh
- }
- if s.ready() {
- return AccuracyMedium
- }
- return AccuracyLow
- }
- func (s *stabilityTracker) stable() bool {
- if len(s.movingAverages) < s.window {
- return false
- }
- currentAverage := s.movingAverages[len(s.movingAverages)-1]
- if currentAverage <= 0 {
- return false
- }
- return stdDevFloat64s(s.movingAverages) <= currentAverage*(s.stdDevTolerancePct/100)
- }
- type directionMeasurement struct {
- capacity int64
- rpm int32
- capacityAccuracy Accuracy
- rpmAccuracy Accuracy
- }
- type directionRunner struct {
- factory MeasurementClientFactory
- plan directionPlan
- probeBytes int64
- errCh chan error
- errOnce sync.Once
- wg sync.WaitGroup
- totalBytes atomic.Int64
- currentCapacity atomic.Int64
- currentRPM atomic.Int32
- currentInterval atomic.Int64
- connMu sync.Mutex
- connections []*loadConnection
- probeMu sync.Mutex
- probeRounds []probeRound
- intervalProbeValues []float64
- responsivenessWindow *intervalWindow
- throughputs []intervalThroughput
- throughputWindow *intervalWindow
- }
- func newDirectionRunner(factory MeasurementClientFactory, plan directionPlan, probeBytes int64) *directionRunner {
- return &directionRunner{
- factory: factory,
- plan: plan,
- probeBytes: probeBytes,
- errCh: make(chan error, 1),
- }
- }
- func (r *directionRunner) fail(err error) {
- if err == nil {
- return
- }
- r.errOnce.Do(func() {
- select {
- case r.errCh <- err:
- default:
- }
- })
- }
- func (r *directionRunner) onConnectionFailed(err error) {
- r.connMu.Lock()
- activeCount := 0
- for _, conn := range r.connections {
- if conn.active.Load() {
- activeCount++
- }
- }
- r.connMu.Unlock()
- if activeCount == 0 {
- r.fail(err)
- }
- }
- func (r *directionRunner) addConnection(ctx context.Context) error {
- counter := N.CountFunc(func(n int64) { r.totalBytes.Add(n) })
- var readCounters, writeCounters []N.CountFunc
- if r.plan.isUpload {
- writeCounters = []N.CountFunc{counter}
- } else {
- readCounters = []N.CountFunc{counter}
- }
- client, err := r.factory(r.plan.connectEndpoint, true, false, readCounters, writeCounters)
- if err != nil {
- return err
- }
- conn := &loadConnection{
- client: client,
- dataURL: r.plan.dataURL,
- isUpload: r.plan.isUpload,
- }
- r.connMu.Lock()
- r.connections = append(r.connections, conn)
- r.connMu.Unlock()
- r.wg.Add(1)
- go func() {
- defer r.wg.Done()
- conn.run(ctx, r.onConnectionFailed)
- }()
- return nil
- }
- func (r *directionRunner) connectionCount() int {
- r.connMu.Lock()
- defer r.connMu.Unlock()
- return len(r.connections)
- }
- func (r *directionRunner) pickReadyConnection() *loadConnection {
- r.connMu.Lock()
- defer r.connMu.Unlock()
- var ready []*loadConnection
- for _, conn := range r.connections {
- if conn.ready.Load() && conn.active.Load() {
- ready = append(ready, conn)
- }
- }
- if len(ready) == 0 {
- return nil
- }
- return ready[rand.Intn(len(ready))]
- }
- func (r *directionRunner) startProber(ctx context.Context) {
- r.wg.Add(1)
- go func() {
- defer r.wg.Done()
- ticker := time.NewTicker(r.probeInterval())
- defer ticker.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-ticker.C:
- }
- conn := r.pickReadyConnection()
- if conn == nil {
- continue
- }
- go func(selfClient *http.Client) {
- foreignClient, err := r.factory(r.plan.connectEndpoint, true, true, nil, nil)
- if err != nil {
- return
- }
- round, err := collectProbeRound(ctx, foreignClient, selfClient, r.plan.probeURL.String())
- foreignClient.CloseIdleConnections()
- if err != nil {
- return
- }
- r.recordProbeRound(probeRound{
- interval: int(r.currentInterval.Load()),
- tcp: round.tcp,
- tls: round.tls,
- httpFirst: round.httpFirst,
- httpLoaded: round.httpLoaded,
- })
- }(conn.client)
- ticker.Reset(r.probeInterval())
- }
- }()
- }
- func (r *directionRunner) probeInterval() time.Duration {
- interval := time.Second / time.Duration(settings.maxProbesPerSecond)
- capacity := r.currentCapacity.Load()
- if capacity <= 0 || r.probeBytes <= 0 || settings.maxProbeCapacityPct <= 0 {
- return interval
- }
- bitsPerRound := float64(r.probeBytes*2) * 8
- minSeconds := bitsPerRound / (float64(capacity) * settings.maxProbeCapacityPct)
- if minSeconds <= 0 {
- return interval
- }
- capacityInterval := time.Duration(minSeconds * float64(time.Second))
- if capacityInterval > interval {
- interval = capacityInterval
- }
- return interval
- }
- func (r *directionRunner) recordProbeRound(round probeRound) {
- r.probeMu.Lock()
- r.probeRounds = append(r.probeRounds, round)
- if latency := round.responsivenessLatency(); latency > 0 {
- r.intervalProbeValues = append(r.intervalProbeValues, latency)
- }
- r.currentRPM.Store(calculateRPM(r.probeRounds))
- r.probeMu.Unlock()
- }
- func (r *directionRunner) swapIntervalProbeValues() []float64 {
- r.probeMu.Lock()
- defer r.probeMu.Unlock()
- values := append([]float64(nil), r.intervalProbeValues...)
- r.intervalProbeValues = nil
- return values
- }
- func (r *directionRunner) setResponsivenessWindow(currentInterval int) {
- lower := currentInterval - settings.movingAvgDistance + 1
- if lower < 0 {
- lower = 0
- }
- r.probeMu.Lock()
- r.responsivenessWindow = &intervalWindow{lower: lower, upper: currentInterval}
- r.probeMu.Unlock()
- }
- func (r *directionRunner) recordThroughput(interval int, bps float64) {
- r.probeMu.Lock()
- r.throughputs = append(r.throughputs, intervalThroughput{interval: interval, bps: bps})
- r.probeMu.Unlock()
- }
- func (r *directionRunner) setThroughputWindow(currentInterval int) {
- lower := currentInterval - settings.movingAvgDistance + 1
- if lower < 0 {
- lower = 0
- }
- r.probeMu.Lock()
- r.throughputWindow = &intervalWindow{lower: lower, upper: currentInterval}
- r.probeMu.Unlock()
- }
- func (r *directionRunner) finalRPM() int32 {
- r.probeMu.Lock()
- defer r.probeMu.Unlock()
- if r.responsivenessWindow == nil {
- return calculateRPM(r.probeRounds)
- }
- var rounds []probeRound
- for _, round := range r.probeRounds {
- if round.interval >= r.responsivenessWindow.lower && round.interval <= r.responsivenessWindow.upper {
- rounds = append(rounds, round)
- }
- }
- if len(rounds) == 0 {
- rounds = r.probeRounds
- }
- return calculateRPM(rounds)
- }
- func (r *directionRunner) finalCapacity(totalDuration time.Duration) int64 {
- r.probeMu.Lock()
- defer r.probeMu.Unlock()
- var samples []float64
- if r.throughputWindow != nil {
- for _, sample := range r.throughputs {
- if sample.interval >= r.throughputWindow.lower && sample.interval <= r.throughputWindow.upper {
- samples = append(samples, sample.bps)
- }
- }
- }
- if len(samples) == 0 {
- for _, sample := range r.throughputs {
- samples = append(samples, sample.bps)
- }
- }
- if len(samples) > 0 {
- return int64(math.Round(upperTrimmedMean(samples, settings.trimPercent)))
- }
- if totalDuration > 0 {
- return int64(float64(r.totalBytes.Load()) * 8 / totalDuration.Seconds())
- }
- return 0
- }
- func (r *directionRunner) wait() {
- r.wg.Wait()
- }
- func Run(options Options) (*Result, error) {
- ctx := options.Context
- if ctx == nil {
- ctx = context.Background()
- }
- if options.HTTPClient == nil {
- return nil, E.New("http client is required")
- }
- maxRuntime, err := normalizeMaxRuntime(options.MaxRuntime)
- if err != nil {
- return nil, err
- }
- configURL := resolveConfigURL(options.ConfigURL)
- config, err := fetchConfig(ctx, options.HTTPClient, configURL)
- if err != nil {
- return nil, E.Cause(err, "fetch config")
- }
- resolved, err := validateConfig(config)
- if err != nil {
- return nil, E.Cause(err, "validate config")
- }
- start := time.Now()
- report := func(progress Progress) {
- if options.OnProgress == nil {
- return
- }
- progress.ElapsedMs = time.Since(start).Milliseconds()
- options.OnProgress(progress)
- }
- factory := options.NewMeasurementClient
- if factory == nil {
- factory = defaultMeasurementClientFactory(options.HTTPClient)
- }
- report(Progress{Phase: PhaseIdle})
- idleLatency, probeBytes, err := measureIdleLatency(ctx, factory, resolved)
- if err != nil {
- return nil, E.Cause(err, "measure idle latency")
- }
- report(Progress{Phase: PhaseIdle, IdleLatencyMs: idleLatency})
- start = time.Now()
- var download, upload *directionMeasurement
- if options.Serial {
- download, upload, err = measureSerial(
- ctx,
- factory,
- resolved,
- idleLatency,
- probeBytes,
- maxRuntime,
- report,
- )
- } else {
- download, upload, err = measureParallel(
- ctx,
- factory,
- resolved,
- idleLatency,
- probeBytes,
- maxRuntime,
- report,
- )
- }
- if err != nil {
- return nil, err
- }
- result := &Result{
- DownloadCapacity: download.capacity,
- UploadCapacity: upload.capacity,
- DownloadRPM: download.rpm,
- UploadRPM: upload.rpm,
- IdleLatencyMs: idleLatency,
- DownloadCapacityAccuracy: download.capacityAccuracy,
- UploadCapacityAccuracy: upload.capacityAccuracy,
- DownloadRPMAccuracy: download.rpmAccuracy,
- UploadRPMAccuracy: upload.rpmAccuracy,
- }
- report(Progress{
- Phase: PhaseDone,
- DownloadCapacity: result.DownloadCapacity,
- UploadCapacity: result.UploadCapacity,
- DownloadRPM: result.DownloadRPM,
- UploadRPM: result.UploadRPM,
- IdleLatencyMs: result.IdleLatencyMs,
- DownloadCapacityAccuracy: result.DownloadCapacityAccuracy,
- UploadCapacityAccuracy: result.UploadCapacityAccuracy,
- DownloadRPMAccuracy: result.DownloadRPMAccuracy,
- UploadRPMAccuracy: result.UploadRPMAccuracy,
- })
- return result, nil
- }
- func normalizeMaxRuntime(maxRuntime time.Duration) (time.Duration, error) {
- if maxRuntime == 0 {
- return settings.testTimeout, nil
- }
- if maxRuntime < 0 {
- return 0, E.New("max runtime must be positive")
- }
- return maxRuntime, nil
- }
- func measureSerial(
- ctx context.Context,
- factory MeasurementClientFactory,
- resolved *resolvedConfig,
- idleLatency int32,
- probeBytes int64,
- maxRuntime time.Duration,
- report func(Progress),
- ) (*directionMeasurement, *directionMeasurement, error) {
- downloadRuntime, uploadRuntime := splitRuntimeBudget(maxRuntime, 2)
- report(Progress{Phase: PhaseDownload, IdleLatencyMs: idleLatency})
- download, err := measureDirection(ctx, factory, directionPlan{
- dataURL: resolved.largeURL,
- probeURL: resolved.smallURL,
- connectEndpoint: resolved.connectEndpoint,
- }, probeBytes, downloadRuntime, func(capacity int64, rpm int32) {
- report(Progress{
- Phase: PhaseDownload,
- DownloadCapacity: capacity,
- DownloadRPM: rpm,
- IdleLatencyMs: idleLatency,
- })
- })
- if err != nil {
- return nil, nil, E.Cause(err, "measure download")
- }
- report(Progress{
- Phase: PhaseUpload,
- DownloadCapacity: download.capacity,
- DownloadRPM: download.rpm,
- IdleLatencyMs: idleLatency,
- })
- upload, err := measureDirection(ctx, factory, directionPlan{
- dataURL: resolved.uploadURL,
- probeURL: resolved.smallURL,
- connectEndpoint: resolved.connectEndpoint,
- isUpload: true,
- }, probeBytes, uploadRuntime, func(capacity int64, rpm int32) {
- report(Progress{
- Phase: PhaseUpload,
- DownloadCapacity: download.capacity,
- UploadCapacity: capacity,
- DownloadRPM: download.rpm,
- UploadRPM: rpm,
- IdleLatencyMs: idleLatency,
- })
- })
- if err != nil {
- return nil, nil, E.Cause(err, "measure upload")
- }
- return download, upload, nil
- }
- func measureParallel(
- ctx context.Context,
- factory MeasurementClientFactory,
- resolved *resolvedConfig,
- idleLatency int32,
- probeBytes int64,
- maxRuntime time.Duration,
- report func(Progress),
- ) (*directionMeasurement, *directionMeasurement, error) {
- type parallelResult struct {
- measurement *directionMeasurement
- err error
- }
- type progressState struct {
- sync.Mutex
- downloadCapacity int64
- uploadCapacity int64
- downloadRPM int32
- uploadRPM int32
- }
- parallelCtx, cancel := context.WithCancel(ctx)
- defer cancel()
- report(Progress{Phase: PhaseDownload, IdleLatencyMs: idleLatency})
- report(Progress{Phase: PhaseUpload, IdleLatencyMs: idleLatency})
- var state progressState
- sendProgress := func(phase Phase, capacity int64, rpm int32) {
- state.Lock()
- if phase == PhaseDownload {
- state.downloadCapacity = capacity
- state.downloadRPM = rpm
- } else {
- state.uploadCapacity = capacity
- state.uploadRPM = rpm
- }
- snapshot := Progress{
- Phase: phase,
- DownloadCapacity: state.downloadCapacity,
- UploadCapacity: state.uploadCapacity,
- DownloadRPM: state.downloadRPM,
- UploadRPM: state.uploadRPM,
- IdleLatencyMs: idleLatency,
- }
- state.Unlock()
- report(snapshot)
- }
- var wg sync.WaitGroup
- downloadCh := make(chan parallelResult, 1)
- uploadCh := make(chan parallelResult, 1)
- wg.Add(2)
- go func() {
- defer wg.Done()
- measurement, err := measureDirection(parallelCtx, factory, directionPlan{
- dataURL: resolved.largeURL,
- probeURL: resolved.smallURL,
- connectEndpoint: resolved.connectEndpoint,
- }, probeBytes, maxRuntime, func(capacity int64, rpm int32) {
- sendProgress(PhaseDownload, capacity, rpm)
- })
- if err != nil {
- cancel()
- downloadCh <- parallelResult{err: E.Cause(err, "measure download")}
- return
- }
- downloadCh <- parallelResult{measurement: measurement}
- }()
- go func() {
- defer wg.Done()
- measurement, err := measureDirection(parallelCtx, factory, directionPlan{
- dataURL: resolved.uploadURL,
- probeURL: resolved.smallURL,
- connectEndpoint: resolved.connectEndpoint,
- isUpload: true,
- }, probeBytes, maxRuntime, func(capacity int64, rpm int32) {
- sendProgress(PhaseUpload, capacity, rpm)
- })
- if err != nil {
- cancel()
- uploadCh <- parallelResult{err: E.Cause(err, "measure upload")}
- return
- }
- uploadCh <- parallelResult{measurement: measurement}
- }()
- download := <-downloadCh
- upload := <-uploadCh
- wg.Wait()
- if download.err != nil {
- return nil, nil, download.err
- }
- if upload.err != nil {
- return nil, nil, upload.err
- }
- return download.measurement, upload.measurement, nil
- }
- func resolveConfigURL(rawURL string) string {
- rawURL = strings.TrimSpace(rawURL)
- if rawURL == "" {
- return DefaultConfigURL
- }
- if !strings.Contains(rawURL, "://") && !strings.Contains(rawURL, "/") {
- return "https://" + rawURL + "/.well-known/nq"
- }
- parsedURL, err := url.Parse(rawURL)
- if err != nil {
- return rawURL
- }
- if parsedURL.Scheme != "" && parsedURL.Host != "" && (parsedURL.Path == "" || parsedURL.Path == "/") {
- parsedURL.Path = "/.well-known/nq"
- return parsedURL.String()
- }
- return rawURL
- }
- func fetchConfig(ctx context.Context, client *http.Client, configURL string) (*Config, error) {
- req, err := newRequest(ctx, http.MethodGet, configURL, nil)
- if err != nil {
- return nil, err
- }
- resp, err := client.Do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- if err = validateResponse(resp); err != nil {
- return nil, err
- }
- var config Config
- if err = json.NewDecoder(resp.Body).Decode(&config); err != nil {
- return nil, E.Cause(err, "decode config")
- }
- return &config, nil
- }
- func validateConfig(config *Config) (*resolvedConfig, error) {
- if config == nil {
- return nil, E.New("config is nil")
- }
- if config.Version != 1 {
- return nil, E.New("unsupported config version: ", config.Version)
- }
- parseURL := func(name string, rawURL string) (*url.URL, error) {
- if rawURL == "" {
- return nil, E.New("config missing required URL: ", name)
- }
- parsedURL, err := url.Parse(rawURL)
- if err != nil {
- return nil, E.Cause(err, "parse "+name)
- }
- if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
- return nil, E.New("unsupported URL scheme in ", name, ": ", parsedURL.Scheme)
- }
- if parsedURL.Host == "" {
- return nil, E.New("config missing host in ", name)
- }
- return parsedURL, nil
- }
- smallURL, err := parseURL("small_download_url", config.URLs.smallDownloadURL())
- if err != nil {
- return nil, err
- }
- largeURL, err := parseURL("large_download_url", config.URLs.largeDownloadURL())
- if err != nil {
- return nil, err
- }
- uploadURL, err := parseURL("upload_url", config.URLs.uploadURL())
- if err != nil {
- return nil, err
- }
- if smallURL.Host != largeURL.Host || smallURL.Host != uploadURL.Host {
- return nil, E.New("config URLs must use the same host")
- }
- return &resolvedConfig{
- smallURL: smallURL,
- largeURL: largeURL,
- uploadURL: uploadURL,
- connectEndpoint: strings.TrimSpace(config.TestEndpoint),
- }, nil
- }
- func measureIdleLatency(ctx context.Context, factory MeasurementClientFactory, config *resolvedConfig) (int32, int64, error) {
- var latencies []int64
- var maxProbeBytes int64
- for i := 0; i < settings.idleProbeCount; i++ {
- select {
- case <-ctx.Done():
- return 0, 0, ctx.Err()
- default:
- }
- client, err := factory(config.connectEndpoint, true, true, nil, nil)
- if err != nil {
- return 0, 0, err
- }
- measurement, err := runProbe(ctx, client, config.smallURL.String(), false)
- client.CloseIdleConnections()
- if err != nil {
- return 0, 0, err
- }
- latencies = append(latencies, measurement.total.Milliseconds())
- if measurement.bytes > maxProbeBytes {
- maxProbeBytes = measurement.bytes
- }
- }
- sort.Slice(latencies, func(i, j int) bool { return latencies[i] < latencies[j] })
- return int32(latencies[len(latencies)/2]), maxProbeBytes, nil
- }
- func measureDirection(
- ctx context.Context,
- factory MeasurementClientFactory,
- plan directionPlan,
- probeBytes int64,
- maxRuntime time.Duration,
- onProgress func(capacity int64, rpm int32),
- ) (*directionMeasurement, error) {
- phaseCtx, phaseCancel := context.WithTimeout(ctx, maxRuntime)
- defer phaseCancel()
- runner := newDirectionRunner(factory, plan, probeBytes)
- defer runner.wait()
- for i := 0; i < settings.initialConnections; i++ {
- err := runner.addConnection(phaseCtx)
- if err != nil {
- return nil, err
- }
- }
- runner.startProber(phaseCtx)
- throughputTracker := stabilityTracker{
- window: settings.movingAvgDistance,
- stdDevTolerancePct: settings.stdDevTolerancePct,
- }
- responsivenessTracker := stabilityTracker{
- window: settings.movingAvgDistance,
- stdDevTolerancePct: settings.stdDevTolerancePct,
- }
- start := time.Now()
- sampleTicker := time.NewTicker(settings.sampleInterval)
- defer sampleTicker.Stop()
- intervalTicker := time.NewTicker(settings.stabilityInterval)
- defer intervalTicker.Stop()
- progressTicker := time.NewTicker(settings.progressInterval)
- defer progressTicker.Stop()
- prevSampleBytes := int64(0)
- prevSampleTime := start
- prevIntervalBytes := int64(0)
- prevIntervalTime := start
- var ewmaCapacity float64
- var goodputSaturated bool
- var intervalIndex int
- for {
- select {
- case err := <-runner.errCh:
- return nil, err
- case now := <-sampleTicker.C:
- currentBytes := runner.totalBytes.Load()
- elapsed := now.Sub(prevSampleTime).Seconds()
- if elapsed > 0 {
- instantaneousBps := float64(currentBytes-prevSampleBytes) * 8 / elapsed
- if ewmaCapacity == 0 {
- ewmaCapacity = instantaneousBps
- } else {
- ewmaCapacity = 0.3*instantaneousBps + 0.7*ewmaCapacity
- }
- runner.currentCapacity.Store(int64(ewmaCapacity))
- }
- prevSampleBytes = currentBytes
- prevSampleTime = now
- case <-intervalTicker.C:
- now := time.Now()
- currentBytes := runner.totalBytes.Load()
- elapsed := now.Sub(prevIntervalTime).Seconds()
- if elapsed > 0 {
- intervalBps := float64(currentBytes-prevIntervalBytes) * 8 / elapsed
- runner.recordThroughput(intervalIndex, intervalBps)
- throughputStable := throughputTracker.add(intervalBps)
- if throughputStable && runner.throughputWindow == nil {
- runner.setThroughputWindow(intervalIndex)
- }
- if !goodputSaturated && (throughputStable || (runner.connectionCount() >= settings.maxConnections && throughputTracker.ready())) {
- goodputSaturated = true
- }
- if runner.connectionCount() < settings.maxConnections {
- err := runner.addConnection(phaseCtx)
- if err != nil {
- return nil, err
- }
- }
- }
- if goodputSaturated {
- if values := runner.swapIntervalProbeValues(); len(values) > 0 {
- if responsivenessTracker.add(upperTrimmedMean(values, settings.trimPercent)) && runner.responsivenessWindow == nil {
- runner.setResponsivenessWindow(intervalIndex)
- phaseCancel()
- }
- }
- }
- prevIntervalBytes = currentBytes
- prevIntervalTime = now
- intervalIndex++
- runner.currentInterval.Store(int64(intervalIndex))
- case <-progressTicker.C:
- if onProgress != nil {
- onProgress(int64(ewmaCapacity), runner.currentRPM.Load())
- }
- case <-phaseCtx.Done():
- if ctx.Err() != nil {
- return nil, ctx.Err()
- }
- totalDuration := time.Since(start)
- return &directionMeasurement{
- capacity: runner.finalCapacity(totalDuration),
- rpm: runner.finalRPM(),
- capacityAccuracy: throughputTracker.accuracy(),
- rpmAccuracy: responsivenessTracker.accuracy(),
- }, nil
- }
- }
- }
- func splitRuntimeBudget(total time.Duration, directions int) (time.Duration, time.Duration) {
- if directions <= 1 {
- return total, total
- }
- first := total / time.Duration(directions)
- second := total - first
- return first, second
- }
- func collectProbeRound(ctx context.Context, foreignClient *http.Client, selfClient *http.Client, rawURL string) (probeMeasurement, error) {
- var foreignResult probeMeasurement
- var selfResult probeMeasurement
- var foreignErr error
- var selfErr error
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- defer wg.Done()
- foreignResult, foreignErr = runProbe(ctx, foreignClient, rawURL, false)
- }()
- go func() {
- defer wg.Done()
- selfResult, selfErr = runProbe(ctx, selfClient, rawURL, true)
- }()
- wg.Wait()
- if foreignErr != nil {
- return probeMeasurement{}, E.Cause(foreignErr, "foreign probe")
- }
- if selfErr != nil {
- return probeMeasurement{}, E.Cause(selfErr, "self probe")
- }
- return probeMeasurement{
- tcp: foreignResult.tcp,
- tls: foreignResult.tls,
- httpFirst: foreignResult.httpFirst,
- httpLoaded: selfResult.httpLoaded,
- }, nil
- }
- func runProbe(ctx context.Context, client *http.Client, rawURL string, expectReuse bool) (probeMeasurement, error) {
- var trace probeTrace
- start := time.Now()
- req, err := newRequest(httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
- ConnectStart: func(string, string) {
- if trace.connectStart.IsZero() {
- trace.connectStart = time.Now()
- }
- },
- ConnectDone: func(string, string, error) {
- if trace.connectDone.IsZero() {
- trace.connectDone = time.Now()
- }
- },
- TLSHandshakeStart: func() {
- if trace.tlsStart.IsZero() {
- trace.tlsStart = time.Now()
- }
- },
- TLSHandshakeDone: func(state tls.ConnectionState, _ error) {
- if trace.tlsDone.IsZero() {
- trace.tlsDone = time.Now()
- trace.tlsVersion = state.Version
- }
- },
- GotConn: func(info httptrace.GotConnInfo) {
- trace.reused = info.Reused
- trace.gotConn = time.Now()
- },
- WroteRequest: func(httptrace.WroteRequestInfo) {
- trace.wroteRequest = time.Now()
- },
- GotFirstResponseByte: func() {
- trace.firstResponseByte = time.Now()
- },
- }), http.MethodGet, rawURL, nil)
- if err != nil {
- return probeMeasurement{}, err
- }
- if !expectReuse {
- req.Close = true
- }
- resp, err := client.Do(req)
- if err != nil {
- return probeMeasurement{}, err
- }
- defer resp.Body.Close()
- if err = validateResponse(resp); err != nil {
- return probeMeasurement{}, err
- }
- n, err := io.Copy(io.Discard, resp.Body)
- end := time.Now()
- if err != nil {
- return probeMeasurement{}, err
- }
- if expectReuse && !trace.reused {
- return probeMeasurement{}, E.New("self probe did not reuse an existing connection")
- }
- httpStart := trace.wroteRequest
- if httpStart.IsZero() {
- switch {
- case !trace.tlsDone.IsZero():
- httpStart = trace.tlsDone
- case !trace.connectDone.IsZero():
- httpStart = trace.connectDone
- case !trace.gotConn.IsZero():
- httpStart = trace.gotConn
- default:
- httpStart = start
- }
- }
- measurement := probeMeasurement{
- total: end.Sub(start),
- bytes: n,
- reused: trace.reused,
- }
- if !trace.connectStart.IsZero() && !trace.connectDone.IsZero() && trace.connectDone.After(trace.connectStart) {
- measurement.tcp = trace.connectDone.Sub(trace.connectStart)
- }
- if !trace.tlsStart.IsZero() && !trace.tlsDone.IsZero() && trace.tlsDone.After(trace.tlsStart) {
- measurement.tls = trace.tlsDone.Sub(trace.tlsStart)
- if roundTrips := tlsHandshakeRoundTrips(trace.tlsVersion); roundTrips > 1 {
- measurement.tls /= time.Duration(roundTrips)
- }
- }
- if !trace.firstResponseByte.IsZero() && trace.firstResponseByte.After(httpStart) {
- measurement.httpFirst = trace.firstResponseByte.Sub(httpStart)
- }
- if end.After(httpStart) {
- measurement.httpLoaded = end.Sub(httpStart)
- }
- return measurement, nil
- }
- func runDownloadRequest(ctx context.Context, client *http.Client, rawURL string, onActive func()) error {
- req, err := newRequest(ctx, http.MethodGet, rawURL, nil)
- if err != nil {
- return err
- }
- resp, err := client.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- err = validateResponse(resp)
- if err != nil {
- return err
- }
- if onActive != nil {
- onActive()
- }
- _, err = sBufio.Copy(io.Discard, resp.Body)
- if ctx.Err() != nil {
- return nil
- }
- return err
- }
- func runUploadRequest(ctx context.Context, client *http.Client, rawURL string, onActive func()) error {
- body := &uploadBody{
- ctx: ctx,
- onActive: onActive,
- }
- req, err := newRequest(ctx, http.MethodPost, rawURL, body)
- if err != nil {
- return err
- }
- req.ContentLength = -1
- req.Header.Set("Content-Type", "application/octet-stream")
- resp, err := client.Do(req)
- if err != nil {
- if ctx.Err() != nil {
- return nil
- }
- return err
- }
- defer resp.Body.Close()
- err = validateResponse(resp)
- if err != nil {
- return err
- }
- _, _ = io.Copy(io.Discard, resp.Body)
- <-ctx.Done()
- return nil
- }
- func newRequest(ctx context.Context, method string, rawURL string, body io.Reader) (*http.Request, error) {
- req, err := http.NewRequestWithContext(ctx, method, rawURL, body)
- if err != nil {
- return nil, err
- }
- req.Header.Set("Accept-Encoding", "identity")
- return req, nil
- }
- func validateResponse(resp *http.Response) error {
- if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- return E.New("unexpected status: ", resp.Status)
- }
- if encoding := resp.Header.Get("Content-Encoding"); encoding != "" {
- return E.New("unexpected content encoding: ", encoding)
- }
- return nil
- }
- func calculateRPM(rounds []probeRound) int32 {
- if len(rounds) == 0 {
- return 0
- }
- var tcpSamples []float64
- var tlsSamples []float64
- var httpFirstSamples []float64
- var httpLoadedSamples []float64
- for _, round := range rounds {
- if round.tcp > 0 {
- tcpSamples = append(tcpSamples, durationMillis(round.tcp))
- }
- if round.tls > 0 {
- tlsSamples = append(tlsSamples, durationMillis(round.tls))
- }
- if round.httpFirst > 0 {
- httpFirstSamples = append(httpFirstSamples, durationMillis(round.httpFirst))
- }
- if round.httpLoaded > 0 {
- httpLoadedSamples = append(httpLoadedSamples, durationMillis(round.httpLoaded))
- }
- }
- httpLoaded := upperTrimmedMean(httpLoadedSamples, settings.trimPercent)
- if httpLoaded <= 0 {
- return 0
- }
- var foreignComponents []float64
- if tcp := upperTrimmedMean(tcpSamples, settings.trimPercent); tcp > 0 {
- foreignComponents = append(foreignComponents, tcp)
- }
- if tls := upperTrimmedMean(tlsSamples, settings.trimPercent); tls > 0 {
- foreignComponents = append(foreignComponents, tls)
- }
- if httpFirst := upperTrimmedMean(httpFirstSamples, settings.trimPercent); httpFirst > 0 {
- foreignComponents = append(foreignComponents, httpFirst)
- }
- if len(foreignComponents) == 0 {
- return 0
- }
- foreignLatency := meanFloat64s(foreignComponents)
- foreignRPM := 60000.0 / foreignLatency
- loadedRPM := 60000.0 / httpLoaded
- return int32(math.Round((foreignRPM + loadedRPM) / 2))
- }
- func tlsHandshakeRoundTrips(version uint16) int {
- switch version {
- case tls.VersionTLS12, tls.VersionTLS11, tls.VersionTLS10:
- return 2
- default:
- return 1
- }
- }
- func durationMillis(value time.Duration) float64 {
- return float64(value) / float64(time.Millisecond)
- }
- func upperTrimmedMean(values []float64, trimPercent int) float64 {
- trimmed := upperTrimFloat64s(values, trimPercent)
- if len(trimmed) == 0 {
- return 0
- }
- return meanFloat64s(trimmed)
- }
- func upperTrimFloat64s(values []float64, trimPercent int) []float64 {
- if len(values) == 0 {
- return nil
- }
- trimmed := append([]float64(nil), values...)
- sort.Float64s(trimmed)
- if trimPercent <= 0 {
- return trimmed
- }
- trimCount := int(math.Floor(float64(len(trimmed)) * float64(trimPercent) / 100))
- if trimCount <= 0 || trimCount >= len(trimmed) {
- return trimmed
- }
- return trimmed[:len(trimmed)-trimCount]
- }
- func meanFloat64s(values []float64) float64 {
- if len(values) == 0 {
- return 0
- }
- var total float64
- for _, value := range values {
- total += value
- }
- return total / float64(len(values))
- }
- func stdDevFloat64s(values []float64) float64 {
- if len(values) == 0 {
- return 0
- }
- mean := meanFloat64s(values)
- var total float64
- for _, value := range values {
- delta := value - mean
- total += delta * delta
- }
- return math.Sqrt(total / float64(len(values)))
- }
- type uploadBody struct {
- ctx context.Context
- activated atomic.Bool
- onActive func()
- }
- func (u *uploadBody) Read(p []byte) (int, error) {
- if err := u.ctx.Err(); err != nil {
- return 0, err
- }
- clear(p)
- n := len(p)
- if n > 0 && u.onActive != nil && u.activated.CompareAndSwap(false, true) {
- u.onActive()
- }
- return n, nil
- }
- func (u *uploadBody) Close() error {
- return nil
- }
|