networkquality.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413
  1. package networkquality
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/json"
  6. "io"
  7. "math"
  8. "math/rand"
  9. "net/http"
  10. "net/http/httptrace"
  11. "net/url"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. sBufio "github.com/sagernet/sing/common/bufio"
  18. E "github.com/sagernet/sing/common/exceptions"
  19. N "github.com/sagernet/sing/common/network"
  20. )
  21. const DefaultConfigURL = "https://mensura.cdn-apple.com/api/v1/gm/config"
  22. type Config struct {
  23. Version int `json:"version"`
  24. TestEndpoint string `json:"test_endpoint"`
  25. URLs URLs `json:"urls"`
  26. }
  27. type URLs struct {
  28. SmallHTTPSDownloadURL string `json:"small_https_download_url"`
  29. LargeHTTPSDownloadURL string `json:"large_https_download_url"`
  30. HTTPSUploadURL string `json:"https_upload_url"`
  31. SmallDownloadURL string `json:"small_download_url"`
  32. LargeDownloadURL string `json:"large_download_url"`
  33. UploadURL string `json:"upload_url"`
  34. }
  35. func (u *URLs) smallDownloadURL() string {
  36. if u.SmallHTTPSDownloadURL != "" {
  37. return u.SmallHTTPSDownloadURL
  38. }
  39. return u.SmallDownloadURL
  40. }
  41. func (u *URLs) largeDownloadURL() string {
  42. if u.LargeHTTPSDownloadURL != "" {
  43. return u.LargeHTTPSDownloadURL
  44. }
  45. return u.LargeDownloadURL
  46. }
  47. func (u *URLs) uploadURL() string {
  48. if u.HTTPSUploadURL != "" {
  49. return u.HTTPSUploadURL
  50. }
  51. return u.UploadURL
  52. }
  53. type Accuracy int32
  54. const (
  55. AccuracyLow Accuracy = 0
  56. AccuracyMedium Accuracy = 1
  57. AccuracyHigh Accuracy = 2
  58. )
  59. func (a Accuracy) String() string {
  60. switch a {
  61. case AccuracyHigh:
  62. return "High"
  63. case AccuracyMedium:
  64. return "Medium"
  65. default:
  66. return "Low"
  67. }
  68. }
  69. type Result struct {
  70. DownloadCapacity int64
  71. UploadCapacity int64
  72. DownloadRPM int32
  73. UploadRPM int32
  74. IdleLatencyMs int32
  75. DownloadCapacityAccuracy Accuracy
  76. UploadCapacityAccuracy Accuracy
  77. DownloadRPMAccuracy Accuracy
  78. UploadRPMAccuracy Accuracy
  79. }
  80. type Progress struct {
  81. Phase Phase
  82. DownloadCapacity int64
  83. UploadCapacity int64
  84. DownloadRPM int32
  85. UploadRPM int32
  86. IdleLatencyMs int32
  87. ElapsedMs int64
  88. DownloadCapacityAccuracy Accuracy
  89. UploadCapacityAccuracy Accuracy
  90. DownloadRPMAccuracy Accuracy
  91. UploadRPMAccuracy Accuracy
  92. }
  93. type Phase int32
  94. const (
  95. PhaseIdle Phase = 0
  96. PhaseDownload Phase = 1
  97. PhaseUpload Phase = 2
  98. PhaseDone Phase = 3
  99. )
  100. type Options struct {
  101. ConfigURL string
  102. HTTPClient *http.Client
  103. NewMeasurementClient MeasurementClientFactory
  104. Serial bool
  105. MaxRuntime time.Duration
  106. OnProgress func(Progress)
  107. Context context.Context
  108. }
  109. const DefaultMaxRuntime = 20 * time.Second
  110. type measurementSettings struct {
  111. idleProbeCount int
  112. testTimeout time.Duration
  113. stabilityInterval time.Duration
  114. sampleInterval time.Duration
  115. progressInterval time.Duration
  116. maxProbesPerSecond int
  117. initialConnections int
  118. maxConnections int
  119. movingAvgDistance int
  120. trimPercent int
  121. stdDevTolerancePct float64
  122. maxProbeCapacityPct float64
  123. }
  124. var settings = measurementSettings{
  125. idleProbeCount: 5,
  126. testTimeout: DefaultMaxRuntime,
  127. stabilityInterval: time.Second,
  128. sampleInterval: 250 * time.Millisecond,
  129. progressInterval: 500 * time.Millisecond,
  130. maxProbesPerSecond: 100,
  131. initialConnections: 1,
  132. maxConnections: 16,
  133. movingAvgDistance: 4,
  134. trimPercent: 5,
  135. stdDevTolerancePct: 5,
  136. maxProbeCapacityPct: 0.05,
  137. }
  138. type resolvedConfig struct {
  139. smallURL *url.URL
  140. largeURL *url.URL
  141. uploadURL *url.URL
  142. connectEndpoint string
  143. }
  144. type directionPlan struct {
  145. dataURL *url.URL
  146. probeURL *url.URL
  147. connectEndpoint string
  148. isUpload bool
  149. }
  150. type probeTrace struct {
  151. reused bool
  152. connectStart time.Time
  153. connectDone time.Time
  154. tlsStart time.Time
  155. tlsDone time.Time
  156. tlsVersion uint16
  157. gotConn time.Time
  158. wroteRequest time.Time
  159. firstResponseByte time.Time
  160. }
  161. type probeMeasurement struct {
  162. total time.Duration
  163. tcp time.Duration
  164. tls time.Duration
  165. httpFirst time.Duration
  166. httpLoaded time.Duration
  167. bytes int64
  168. reused bool
  169. }
  170. type probeRound struct {
  171. interval int
  172. tcp time.Duration
  173. tls time.Duration
  174. httpFirst time.Duration
  175. httpLoaded time.Duration
  176. }
  177. func (p probeRound) responsivenessLatency() float64 {
  178. var foreignSamples []float64
  179. if p.tcp > 0 {
  180. foreignSamples = append(foreignSamples, durationMillis(p.tcp))
  181. }
  182. if p.tls > 0 {
  183. foreignSamples = append(foreignSamples, durationMillis(p.tls))
  184. }
  185. if p.httpFirst > 0 {
  186. foreignSamples = append(foreignSamples, durationMillis(p.httpFirst))
  187. }
  188. if len(foreignSamples) == 0 || p.httpLoaded <= 0 {
  189. return 0
  190. }
  191. return (meanFloat64s(foreignSamples) + durationMillis(p.httpLoaded)) / 2
  192. }
  193. const maxConsecutiveErrors = 3
  194. type loadConnection struct {
  195. client *http.Client
  196. dataURL *url.URL
  197. isUpload bool
  198. active atomic.Bool
  199. ready atomic.Bool
  200. }
  201. func (c *loadConnection) run(ctx context.Context, onError func(error)) {
  202. defer c.client.CloseIdleConnections()
  203. markActive := func() {
  204. c.ready.Store(true)
  205. c.active.Store(true)
  206. }
  207. var consecutiveErrors int
  208. for {
  209. select {
  210. case <-ctx.Done():
  211. return
  212. default:
  213. }
  214. var err error
  215. if c.isUpload {
  216. err = runUploadRequest(ctx, c.client, c.dataURL.String(), markActive)
  217. } else {
  218. err = runDownloadRequest(ctx, c.client, c.dataURL.String(), markActive)
  219. }
  220. c.active.Store(false)
  221. if err != nil {
  222. if ctx.Err() != nil {
  223. return
  224. }
  225. consecutiveErrors++
  226. if consecutiveErrors > maxConsecutiveErrors {
  227. onError(err)
  228. return
  229. }
  230. c.client.CloseIdleConnections()
  231. continue
  232. }
  233. consecutiveErrors = 0
  234. }
  235. }
  236. type intervalThroughput struct {
  237. interval int
  238. bps float64
  239. }
  240. type intervalWindow struct {
  241. lower int
  242. upper int
  243. }
  244. type stabilityTracker struct {
  245. window int
  246. stdDevTolerancePct float64
  247. instantaneous []float64
  248. movingAverages []float64
  249. }
  250. func (s *stabilityTracker) add(value float64) bool {
  251. if value <= 0 || math.IsNaN(value) || math.IsInf(value, 0) {
  252. return false
  253. }
  254. s.instantaneous = append(s.instantaneous, value)
  255. if len(s.instantaneous) > s.window {
  256. s.instantaneous = s.instantaneous[len(s.instantaneous)-s.window:]
  257. }
  258. s.movingAverages = append(s.movingAverages, meanFloat64s(s.instantaneous))
  259. if len(s.movingAverages) > s.window {
  260. s.movingAverages = s.movingAverages[len(s.movingAverages)-s.window:]
  261. }
  262. return s.stable()
  263. }
  264. func (s *stabilityTracker) ready() bool {
  265. return len(s.movingAverages) >= s.window
  266. }
  267. func (s *stabilityTracker) accuracy() Accuracy {
  268. if s.stable() {
  269. return AccuracyHigh
  270. }
  271. if s.ready() {
  272. return AccuracyMedium
  273. }
  274. return AccuracyLow
  275. }
  276. func (s *stabilityTracker) stable() bool {
  277. if len(s.movingAverages) < s.window {
  278. return false
  279. }
  280. currentAverage := s.movingAverages[len(s.movingAverages)-1]
  281. if currentAverage <= 0 {
  282. return false
  283. }
  284. return stdDevFloat64s(s.movingAverages) <= currentAverage*(s.stdDevTolerancePct/100)
  285. }
  286. type directionMeasurement struct {
  287. capacity int64
  288. rpm int32
  289. capacityAccuracy Accuracy
  290. rpmAccuracy Accuracy
  291. }
  292. type directionRunner struct {
  293. factory MeasurementClientFactory
  294. plan directionPlan
  295. probeBytes int64
  296. errCh chan error
  297. errOnce sync.Once
  298. wg sync.WaitGroup
  299. totalBytes atomic.Int64
  300. currentCapacity atomic.Int64
  301. currentRPM atomic.Int32
  302. currentInterval atomic.Int64
  303. connMu sync.Mutex
  304. connections []*loadConnection
  305. probeMu sync.Mutex
  306. probeRounds []probeRound
  307. intervalProbeValues []float64
  308. responsivenessWindow *intervalWindow
  309. throughputs []intervalThroughput
  310. throughputWindow *intervalWindow
  311. }
  312. func newDirectionRunner(factory MeasurementClientFactory, plan directionPlan, probeBytes int64) *directionRunner {
  313. return &directionRunner{
  314. factory: factory,
  315. plan: plan,
  316. probeBytes: probeBytes,
  317. errCh: make(chan error, 1),
  318. }
  319. }
  320. func (r *directionRunner) fail(err error) {
  321. if err == nil {
  322. return
  323. }
  324. r.errOnce.Do(func() {
  325. select {
  326. case r.errCh <- err:
  327. default:
  328. }
  329. })
  330. }
  331. func (r *directionRunner) onConnectionFailed(err error) {
  332. r.connMu.Lock()
  333. activeCount := 0
  334. for _, conn := range r.connections {
  335. if conn.active.Load() {
  336. activeCount++
  337. }
  338. }
  339. r.connMu.Unlock()
  340. if activeCount == 0 {
  341. r.fail(err)
  342. }
  343. }
  344. func (r *directionRunner) addConnection(ctx context.Context) error {
  345. counter := N.CountFunc(func(n int64) { r.totalBytes.Add(n) })
  346. var readCounters, writeCounters []N.CountFunc
  347. if r.plan.isUpload {
  348. writeCounters = []N.CountFunc{counter}
  349. } else {
  350. readCounters = []N.CountFunc{counter}
  351. }
  352. client, err := r.factory(r.plan.connectEndpoint, true, false, readCounters, writeCounters)
  353. if err != nil {
  354. return err
  355. }
  356. conn := &loadConnection{
  357. client: client,
  358. dataURL: r.plan.dataURL,
  359. isUpload: r.plan.isUpload,
  360. }
  361. r.connMu.Lock()
  362. r.connections = append(r.connections, conn)
  363. r.connMu.Unlock()
  364. r.wg.Add(1)
  365. go func() {
  366. defer r.wg.Done()
  367. conn.run(ctx, r.onConnectionFailed)
  368. }()
  369. return nil
  370. }
  371. func (r *directionRunner) connectionCount() int {
  372. r.connMu.Lock()
  373. defer r.connMu.Unlock()
  374. return len(r.connections)
  375. }
  376. func (r *directionRunner) pickReadyConnection() *loadConnection {
  377. r.connMu.Lock()
  378. defer r.connMu.Unlock()
  379. var ready []*loadConnection
  380. for _, conn := range r.connections {
  381. if conn.ready.Load() && conn.active.Load() {
  382. ready = append(ready, conn)
  383. }
  384. }
  385. if len(ready) == 0 {
  386. return nil
  387. }
  388. return ready[rand.Intn(len(ready))]
  389. }
  390. func (r *directionRunner) startProber(ctx context.Context) {
  391. r.wg.Add(1)
  392. go func() {
  393. defer r.wg.Done()
  394. ticker := time.NewTicker(r.probeInterval())
  395. defer ticker.Stop()
  396. for {
  397. select {
  398. case <-ctx.Done():
  399. return
  400. case <-ticker.C:
  401. }
  402. conn := r.pickReadyConnection()
  403. if conn == nil {
  404. continue
  405. }
  406. go func(selfClient *http.Client) {
  407. foreignClient, err := r.factory(r.plan.connectEndpoint, true, true, nil, nil)
  408. if err != nil {
  409. return
  410. }
  411. round, err := collectProbeRound(ctx, foreignClient, selfClient, r.plan.probeURL.String())
  412. foreignClient.CloseIdleConnections()
  413. if err != nil {
  414. return
  415. }
  416. r.recordProbeRound(probeRound{
  417. interval: int(r.currentInterval.Load()),
  418. tcp: round.tcp,
  419. tls: round.tls,
  420. httpFirst: round.httpFirst,
  421. httpLoaded: round.httpLoaded,
  422. })
  423. }(conn.client)
  424. ticker.Reset(r.probeInterval())
  425. }
  426. }()
  427. }
  428. func (r *directionRunner) probeInterval() time.Duration {
  429. interval := time.Second / time.Duration(settings.maxProbesPerSecond)
  430. capacity := r.currentCapacity.Load()
  431. if capacity <= 0 || r.probeBytes <= 0 || settings.maxProbeCapacityPct <= 0 {
  432. return interval
  433. }
  434. bitsPerRound := float64(r.probeBytes*2) * 8
  435. minSeconds := bitsPerRound / (float64(capacity) * settings.maxProbeCapacityPct)
  436. if minSeconds <= 0 {
  437. return interval
  438. }
  439. capacityInterval := time.Duration(minSeconds * float64(time.Second))
  440. if capacityInterval > interval {
  441. interval = capacityInterval
  442. }
  443. return interval
  444. }
  445. func (r *directionRunner) recordProbeRound(round probeRound) {
  446. r.probeMu.Lock()
  447. r.probeRounds = append(r.probeRounds, round)
  448. if latency := round.responsivenessLatency(); latency > 0 {
  449. r.intervalProbeValues = append(r.intervalProbeValues, latency)
  450. }
  451. r.currentRPM.Store(calculateRPM(r.probeRounds))
  452. r.probeMu.Unlock()
  453. }
  454. func (r *directionRunner) swapIntervalProbeValues() []float64 {
  455. r.probeMu.Lock()
  456. defer r.probeMu.Unlock()
  457. values := append([]float64(nil), r.intervalProbeValues...)
  458. r.intervalProbeValues = nil
  459. return values
  460. }
  461. func (r *directionRunner) setResponsivenessWindow(currentInterval int) {
  462. lower := currentInterval - settings.movingAvgDistance + 1
  463. if lower < 0 {
  464. lower = 0
  465. }
  466. r.probeMu.Lock()
  467. r.responsivenessWindow = &intervalWindow{lower: lower, upper: currentInterval}
  468. r.probeMu.Unlock()
  469. }
  470. func (r *directionRunner) recordThroughput(interval int, bps float64) {
  471. r.probeMu.Lock()
  472. r.throughputs = append(r.throughputs, intervalThroughput{interval: interval, bps: bps})
  473. r.probeMu.Unlock()
  474. }
  475. func (r *directionRunner) setThroughputWindow(currentInterval int) {
  476. lower := currentInterval - settings.movingAvgDistance + 1
  477. if lower < 0 {
  478. lower = 0
  479. }
  480. r.probeMu.Lock()
  481. r.throughputWindow = &intervalWindow{lower: lower, upper: currentInterval}
  482. r.probeMu.Unlock()
  483. }
  484. func (r *directionRunner) finalRPM() int32 {
  485. r.probeMu.Lock()
  486. defer r.probeMu.Unlock()
  487. if r.responsivenessWindow == nil {
  488. return calculateRPM(r.probeRounds)
  489. }
  490. var rounds []probeRound
  491. for _, round := range r.probeRounds {
  492. if round.interval >= r.responsivenessWindow.lower && round.interval <= r.responsivenessWindow.upper {
  493. rounds = append(rounds, round)
  494. }
  495. }
  496. if len(rounds) == 0 {
  497. rounds = r.probeRounds
  498. }
  499. return calculateRPM(rounds)
  500. }
  501. func (r *directionRunner) finalCapacity(totalDuration time.Duration) int64 {
  502. r.probeMu.Lock()
  503. defer r.probeMu.Unlock()
  504. var samples []float64
  505. if r.throughputWindow != nil {
  506. for _, sample := range r.throughputs {
  507. if sample.interval >= r.throughputWindow.lower && sample.interval <= r.throughputWindow.upper {
  508. samples = append(samples, sample.bps)
  509. }
  510. }
  511. }
  512. if len(samples) == 0 {
  513. for _, sample := range r.throughputs {
  514. samples = append(samples, sample.bps)
  515. }
  516. }
  517. if len(samples) > 0 {
  518. return int64(math.Round(upperTrimmedMean(samples, settings.trimPercent)))
  519. }
  520. if totalDuration > 0 {
  521. return int64(float64(r.totalBytes.Load()) * 8 / totalDuration.Seconds())
  522. }
  523. return 0
  524. }
  525. func (r *directionRunner) wait() {
  526. r.wg.Wait()
  527. }
  528. func Run(options Options) (*Result, error) {
  529. ctx := options.Context
  530. if ctx == nil {
  531. ctx = context.Background()
  532. }
  533. if options.HTTPClient == nil {
  534. return nil, E.New("http client is required")
  535. }
  536. maxRuntime, err := normalizeMaxRuntime(options.MaxRuntime)
  537. if err != nil {
  538. return nil, err
  539. }
  540. configURL := resolveConfigURL(options.ConfigURL)
  541. config, err := fetchConfig(ctx, options.HTTPClient, configURL)
  542. if err != nil {
  543. return nil, E.Cause(err, "fetch config")
  544. }
  545. resolved, err := validateConfig(config)
  546. if err != nil {
  547. return nil, E.Cause(err, "validate config")
  548. }
  549. start := time.Now()
  550. report := func(progress Progress) {
  551. if options.OnProgress == nil {
  552. return
  553. }
  554. progress.ElapsedMs = time.Since(start).Milliseconds()
  555. options.OnProgress(progress)
  556. }
  557. factory := options.NewMeasurementClient
  558. if factory == nil {
  559. factory = defaultMeasurementClientFactory(options.HTTPClient)
  560. }
  561. report(Progress{Phase: PhaseIdle})
  562. idleLatency, probeBytes, err := measureIdleLatency(ctx, factory, resolved)
  563. if err != nil {
  564. return nil, E.Cause(err, "measure idle latency")
  565. }
  566. report(Progress{Phase: PhaseIdle, IdleLatencyMs: idleLatency})
  567. start = time.Now()
  568. var download, upload *directionMeasurement
  569. if options.Serial {
  570. download, upload, err = measureSerial(
  571. ctx,
  572. factory,
  573. resolved,
  574. idleLatency,
  575. probeBytes,
  576. maxRuntime,
  577. report,
  578. )
  579. } else {
  580. download, upload, err = measureParallel(
  581. ctx,
  582. factory,
  583. resolved,
  584. idleLatency,
  585. probeBytes,
  586. maxRuntime,
  587. report,
  588. )
  589. }
  590. if err != nil {
  591. return nil, err
  592. }
  593. result := &Result{
  594. DownloadCapacity: download.capacity,
  595. UploadCapacity: upload.capacity,
  596. DownloadRPM: download.rpm,
  597. UploadRPM: upload.rpm,
  598. IdleLatencyMs: idleLatency,
  599. DownloadCapacityAccuracy: download.capacityAccuracy,
  600. UploadCapacityAccuracy: upload.capacityAccuracy,
  601. DownloadRPMAccuracy: download.rpmAccuracy,
  602. UploadRPMAccuracy: upload.rpmAccuracy,
  603. }
  604. report(Progress{
  605. Phase: PhaseDone,
  606. DownloadCapacity: result.DownloadCapacity,
  607. UploadCapacity: result.UploadCapacity,
  608. DownloadRPM: result.DownloadRPM,
  609. UploadRPM: result.UploadRPM,
  610. IdleLatencyMs: result.IdleLatencyMs,
  611. DownloadCapacityAccuracy: result.DownloadCapacityAccuracy,
  612. UploadCapacityAccuracy: result.UploadCapacityAccuracy,
  613. DownloadRPMAccuracy: result.DownloadRPMAccuracy,
  614. UploadRPMAccuracy: result.UploadRPMAccuracy,
  615. })
  616. return result, nil
  617. }
  618. func normalizeMaxRuntime(maxRuntime time.Duration) (time.Duration, error) {
  619. if maxRuntime == 0 {
  620. return settings.testTimeout, nil
  621. }
  622. if maxRuntime < 0 {
  623. return 0, E.New("max runtime must be positive")
  624. }
  625. return maxRuntime, nil
  626. }
  627. func measureSerial(
  628. ctx context.Context,
  629. factory MeasurementClientFactory,
  630. resolved *resolvedConfig,
  631. idleLatency int32,
  632. probeBytes int64,
  633. maxRuntime time.Duration,
  634. report func(Progress),
  635. ) (*directionMeasurement, *directionMeasurement, error) {
  636. downloadRuntime, uploadRuntime := splitRuntimeBudget(maxRuntime, 2)
  637. report(Progress{Phase: PhaseDownload, IdleLatencyMs: idleLatency})
  638. download, err := measureDirection(ctx, factory, directionPlan{
  639. dataURL: resolved.largeURL,
  640. probeURL: resolved.smallURL,
  641. connectEndpoint: resolved.connectEndpoint,
  642. }, probeBytes, downloadRuntime, func(capacity int64, rpm int32) {
  643. report(Progress{
  644. Phase: PhaseDownload,
  645. DownloadCapacity: capacity,
  646. DownloadRPM: rpm,
  647. IdleLatencyMs: idleLatency,
  648. })
  649. })
  650. if err != nil {
  651. return nil, nil, E.Cause(err, "measure download")
  652. }
  653. report(Progress{
  654. Phase: PhaseUpload,
  655. DownloadCapacity: download.capacity,
  656. DownloadRPM: download.rpm,
  657. IdleLatencyMs: idleLatency,
  658. })
  659. upload, err := measureDirection(ctx, factory, directionPlan{
  660. dataURL: resolved.uploadURL,
  661. probeURL: resolved.smallURL,
  662. connectEndpoint: resolved.connectEndpoint,
  663. isUpload: true,
  664. }, probeBytes, uploadRuntime, func(capacity int64, rpm int32) {
  665. report(Progress{
  666. Phase: PhaseUpload,
  667. DownloadCapacity: download.capacity,
  668. UploadCapacity: capacity,
  669. DownloadRPM: download.rpm,
  670. UploadRPM: rpm,
  671. IdleLatencyMs: idleLatency,
  672. })
  673. })
  674. if err != nil {
  675. return nil, nil, E.Cause(err, "measure upload")
  676. }
  677. return download, upload, nil
  678. }
  679. func measureParallel(
  680. ctx context.Context,
  681. factory MeasurementClientFactory,
  682. resolved *resolvedConfig,
  683. idleLatency int32,
  684. probeBytes int64,
  685. maxRuntime time.Duration,
  686. report func(Progress),
  687. ) (*directionMeasurement, *directionMeasurement, error) {
  688. type parallelResult struct {
  689. measurement *directionMeasurement
  690. err error
  691. }
  692. type progressState struct {
  693. sync.Mutex
  694. downloadCapacity int64
  695. uploadCapacity int64
  696. downloadRPM int32
  697. uploadRPM int32
  698. }
  699. parallelCtx, cancel := context.WithCancel(ctx)
  700. defer cancel()
  701. report(Progress{Phase: PhaseDownload, IdleLatencyMs: idleLatency})
  702. report(Progress{Phase: PhaseUpload, IdleLatencyMs: idleLatency})
  703. var state progressState
  704. sendProgress := func(phase Phase, capacity int64, rpm int32) {
  705. state.Lock()
  706. if phase == PhaseDownload {
  707. state.downloadCapacity = capacity
  708. state.downloadRPM = rpm
  709. } else {
  710. state.uploadCapacity = capacity
  711. state.uploadRPM = rpm
  712. }
  713. snapshot := Progress{
  714. Phase: phase,
  715. DownloadCapacity: state.downloadCapacity,
  716. UploadCapacity: state.uploadCapacity,
  717. DownloadRPM: state.downloadRPM,
  718. UploadRPM: state.uploadRPM,
  719. IdleLatencyMs: idleLatency,
  720. }
  721. state.Unlock()
  722. report(snapshot)
  723. }
  724. var wg sync.WaitGroup
  725. downloadCh := make(chan parallelResult, 1)
  726. uploadCh := make(chan parallelResult, 1)
  727. wg.Add(2)
  728. go func() {
  729. defer wg.Done()
  730. measurement, err := measureDirection(parallelCtx, factory, directionPlan{
  731. dataURL: resolved.largeURL,
  732. probeURL: resolved.smallURL,
  733. connectEndpoint: resolved.connectEndpoint,
  734. }, probeBytes, maxRuntime, func(capacity int64, rpm int32) {
  735. sendProgress(PhaseDownload, capacity, rpm)
  736. })
  737. if err != nil {
  738. cancel()
  739. downloadCh <- parallelResult{err: E.Cause(err, "measure download")}
  740. return
  741. }
  742. downloadCh <- parallelResult{measurement: measurement}
  743. }()
  744. go func() {
  745. defer wg.Done()
  746. measurement, err := measureDirection(parallelCtx, factory, directionPlan{
  747. dataURL: resolved.uploadURL,
  748. probeURL: resolved.smallURL,
  749. connectEndpoint: resolved.connectEndpoint,
  750. isUpload: true,
  751. }, probeBytes, maxRuntime, func(capacity int64, rpm int32) {
  752. sendProgress(PhaseUpload, capacity, rpm)
  753. })
  754. if err != nil {
  755. cancel()
  756. uploadCh <- parallelResult{err: E.Cause(err, "measure upload")}
  757. return
  758. }
  759. uploadCh <- parallelResult{measurement: measurement}
  760. }()
  761. download := <-downloadCh
  762. upload := <-uploadCh
  763. wg.Wait()
  764. if download.err != nil {
  765. return nil, nil, download.err
  766. }
  767. if upload.err != nil {
  768. return nil, nil, upload.err
  769. }
  770. return download.measurement, upload.measurement, nil
  771. }
  772. func resolveConfigURL(rawURL string) string {
  773. rawURL = strings.TrimSpace(rawURL)
  774. if rawURL == "" {
  775. return DefaultConfigURL
  776. }
  777. if !strings.Contains(rawURL, "://") && !strings.Contains(rawURL, "/") {
  778. return "https://" + rawURL + "/.well-known/nq"
  779. }
  780. parsedURL, err := url.Parse(rawURL)
  781. if err != nil {
  782. return rawURL
  783. }
  784. if parsedURL.Scheme != "" && parsedURL.Host != "" && (parsedURL.Path == "" || parsedURL.Path == "/") {
  785. parsedURL.Path = "/.well-known/nq"
  786. return parsedURL.String()
  787. }
  788. return rawURL
  789. }
  790. func fetchConfig(ctx context.Context, client *http.Client, configURL string) (*Config, error) {
  791. req, err := newRequest(ctx, http.MethodGet, configURL, nil)
  792. if err != nil {
  793. return nil, err
  794. }
  795. resp, err := client.Do(req)
  796. if err != nil {
  797. return nil, err
  798. }
  799. defer resp.Body.Close()
  800. if err = validateResponse(resp); err != nil {
  801. return nil, err
  802. }
  803. var config Config
  804. if err = json.NewDecoder(resp.Body).Decode(&config); err != nil {
  805. return nil, E.Cause(err, "decode config")
  806. }
  807. return &config, nil
  808. }
  809. func validateConfig(config *Config) (*resolvedConfig, error) {
  810. if config == nil {
  811. return nil, E.New("config is nil")
  812. }
  813. if config.Version != 1 {
  814. return nil, E.New("unsupported config version: ", config.Version)
  815. }
  816. parseURL := func(name string, rawURL string) (*url.URL, error) {
  817. if rawURL == "" {
  818. return nil, E.New("config missing required URL: ", name)
  819. }
  820. parsedURL, err := url.Parse(rawURL)
  821. if err != nil {
  822. return nil, E.Cause(err, "parse "+name)
  823. }
  824. if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
  825. return nil, E.New("unsupported URL scheme in ", name, ": ", parsedURL.Scheme)
  826. }
  827. if parsedURL.Host == "" {
  828. return nil, E.New("config missing host in ", name)
  829. }
  830. return parsedURL, nil
  831. }
  832. smallURL, err := parseURL("small_download_url", config.URLs.smallDownloadURL())
  833. if err != nil {
  834. return nil, err
  835. }
  836. largeURL, err := parseURL("large_download_url", config.URLs.largeDownloadURL())
  837. if err != nil {
  838. return nil, err
  839. }
  840. uploadURL, err := parseURL("upload_url", config.URLs.uploadURL())
  841. if err != nil {
  842. return nil, err
  843. }
  844. if smallURL.Host != largeURL.Host || smallURL.Host != uploadURL.Host {
  845. return nil, E.New("config URLs must use the same host")
  846. }
  847. return &resolvedConfig{
  848. smallURL: smallURL,
  849. largeURL: largeURL,
  850. uploadURL: uploadURL,
  851. connectEndpoint: strings.TrimSpace(config.TestEndpoint),
  852. }, nil
  853. }
  854. func measureIdleLatency(ctx context.Context, factory MeasurementClientFactory, config *resolvedConfig) (int32, int64, error) {
  855. var latencies []int64
  856. var maxProbeBytes int64
  857. for i := 0; i < settings.idleProbeCount; i++ {
  858. select {
  859. case <-ctx.Done():
  860. return 0, 0, ctx.Err()
  861. default:
  862. }
  863. client, err := factory(config.connectEndpoint, true, true, nil, nil)
  864. if err != nil {
  865. return 0, 0, err
  866. }
  867. measurement, err := runProbe(ctx, client, config.smallURL.String(), false)
  868. client.CloseIdleConnections()
  869. if err != nil {
  870. return 0, 0, err
  871. }
  872. latencies = append(latencies, measurement.total.Milliseconds())
  873. if measurement.bytes > maxProbeBytes {
  874. maxProbeBytes = measurement.bytes
  875. }
  876. }
  877. sort.Slice(latencies, func(i, j int) bool { return latencies[i] < latencies[j] })
  878. return int32(latencies[len(latencies)/2]), maxProbeBytes, nil
  879. }
  880. func measureDirection(
  881. ctx context.Context,
  882. factory MeasurementClientFactory,
  883. plan directionPlan,
  884. probeBytes int64,
  885. maxRuntime time.Duration,
  886. onProgress func(capacity int64, rpm int32),
  887. ) (*directionMeasurement, error) {
  888. phaseCtx, phaseCancel := context.WithTimeout(ctx, maxRuntime)
  889. defer phaseCancel()
  890. runner := newDirectionRunner(factory, plan, probeBytes)
  891. defer runner.wait()
  892. for i := 0; i < settings.initialConnections; i++ {
  893. err := runner.addConnection(phaseCtx)
  894. if err != nil {
  895. return nil, err
  896. }
  897. }
  898. runner.startProber(phaseCtx)
  899. throughputTracker := stabilityTracker{
  900. window: settings.movingAvgDistance,
  901. stdDevTolerancePct: settings.stdDevTolerancePct,
  902. }
  903. responsivenessTracker := stabilityTracker{
  904. window: settings.movingAvgDistance,
  905. stdDevTolerancePct: settings.stdDevTolerancePct,
  906. }
  907. start := time.Now()
  908. sampleTicker := time.NewTicker(settings.sampleInterval)
  909. defer sampleTicker.Stop()
  910. intervalTicker := time.NewTicker(settings.stabilityInterval)
  911. defer intervalTicker.Stop()
  912. progressTicker := time.NewTicker(settings.progressInterval)
  913. defer progressTicker.Stop()
  914. prevSampleBytes := int64(0)
  915. prevSampleTime := start
  916. prevIntervalBytes := int64(0)
  917. prevIntervalTime := start
  918. var ewmaCapacity float64
  919. var goodputSaturated bool
  920. var intervalIndex int
  921. for {
  922. select {
  923. case err := <-runner.errCh:
  924. return nil, err
  925. case now := <-sampleTicker.C:
  926. currentBytes := runner.totalBytes.Load()
  927. elapsed := now.Sub(prevSampleTime).Seconds()
  928. if elapsed > 0 {
  929. instantaneousBps := float64(currentBytes-prevSampleBytes) * 8 / elapsed
  930. if ewmaCapacity == 0 {
  931. ewmaCapacity = instantaneousBps
  932. } else {
  933. ewmaCapacity = 0.3*instantaneousBps + 0.7*ewmaCapacity
  934. }
  935. runner.currentCapacity.Store(int64(ewmaCapacity))
  936. }
  937. prevSampleBytes = currentBytes
  938. prevSampleTime = now
  939. case <-intervalTicker.C:
  940. now := time.Now()
  941. currentBytes := runner.totalBytes.Load()
  942. elapsed := now.Sub(prevIntervalTime).Seconds()
  943. if elapsed > 0 {
  944. intervalBps := float64(currentBytes-prevIntervalBytes) * 8 / elapsed
  945. runner.recordThroughput(intervalIndex, intervalBps)
  946. throughputStable := throughputTracker.add(intervalBps)
  947. if throughputStable && runner.throughputWindow == nil {
  948. runner.setThroughputWindow(intervalIndex)
  949. }
  950. if !goodputSaturated && (throughputStable || (runner.connectionCount() >= settings.maxConnections && throughputTracker.ready())) {
  951. goodputSaturated = true
  952. }
  953. if runner.connectionCount() < settings.maxConnections {
  954. err := runner.addConnection(phaseCtx)
  955. if err != nil {
  956. return nil, err
  957. }
  958. }
  959. }
  960. if goodputSaturated {
  961. if values := runner.swapIntervalProbeValues(); len(values) > 0 {
  962. if responsivenessTracker.add(upperTrimmedMean(values, settings.trimPercent)) && runner.responsivenessWindow == nil {
  963. runner.setResponsivenessWindow(intervalIndex)
  964. phaseCancel()
  965. }
  966. }
  967. }
  968. prevIntervalBytes = currentBytes
  969. prevIntervalTime = now
  970. intervalIndex++
  971. runner.currentInterval.Store(int64(intervalIndex))
  972. case <-progressTicker.C:
  973. if onProgress != nil {
  974. onProgress(int64(ewmaCapacity), runner.currentRPM.Load())
  975. }
  976. case <-phaseCtx.Done():
  977. if ctx.Err() != nil {
  978. return nil, ctx.Err()
  979. }
  980. totalDuration := time.Since(start)
  981. return &directionMeasurement{
  982. capacity: runner.finalCapacity(totalDuration),
  983. rpm: runner.finalRPM(),
  984. capacityAccuracy: throughputTracker.accuracy(),
  985. rpmAccuracy: responsivenessTracker.accuracy(),
  986. }, nil
  987. }
  988. }
  989. }
  990. func splitRuntimeBudget(total time.Duration, directions int) (time.Duration, time.Duration) {
  991. if directions <= 1 {
  992. return total, total
  993. }
  994. first := total / time.Duration(directions)
  995. second := total - first
  996. return first, second
  997. }
  998. func collectProbeRound(ctx context.Context, foreignClient *http.Client, selfClient *http.Client, rawURL string) (probeMeasurement, error) {
  999. var foreignResult probeMeasurement
  1000. var selfResult probeMeasurement
  1001. var foreignErr error
  1002. var selfErr error
  1003. var wg sync.WaitGroup
  1004. wg.Add(2)
  1005. go func() {
  1006. defer wg.Done()
  1007. foreignResult, foreignErr = runProbe(ctx, foreignClient, rawURL, false)
  1008. }()
  1009. go func() {
  1010. defer wg.Done()
  1011. selfResult, selfErr = runProbe(ctx, selfClient, rawURL, true)
  1012. }()
  1013. wg.Wait()
  1014. if foreignErr != nil {
  1015. return probeMeasurement{}, E.Cause(foreignErr, "foreign probe")
  1016. }
  1017. if selfErr != nil {
  1018. return probeMeasurement{}, E.Cause(selfErr, "self probe")
  1019. }
  1020. return probeMeasurement{
  1021. tcp: foreignResult.tcp,
  1022. tls: foreignResult.tls,
  1023. httpFirst: foreignResult.httpFirst,
  1024. httpLoaded: selfResult.httpLoaded,
  1025. }, nil
  1026. }
  1027. func runProbe(ctx context.Context, client *http.Client, rawURL string, expectReuse bool) (probeMeasurement, error) {
  1028. var trace probeTrace
  1029. start := time.Now()
  1030. req, err := newRequest(httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
  1031. ConnectStart: func(string, string) {
  1032. if trace.connectStart.IsZero() {
  1033. trace.connectStart = time.Now()
  1034. }
  1035. },
  1036. ConnectDone: func(string, string, error) {
  1037. if trace.connectDone.IsZero() {
  1038. trace.connectDone = time.Now()
  1039. }
  1040. },
  1041. TLSHandshakeStart: func() {
  1042. if trace.tlsStart.IsZero() {
  1043. trace.tlsStart = time.Now()
  1044. }
  1045. },
  1046. TLSHandshakeDone: func(state tls.ConnectionState, _ error) {
  1047. if trace.tlsDone.IsZero() {
  1048. trace.tlsDone = time.Now()
  1049. trace.tlsVersion = state.Version
  1050. }
  1051. },
  1052. GotConn: func(info httptrace.GotConnInfo) {
  1053. trace.reused = info.Reused
  1054. trace.gotConn = time.Now()
  1055. },
  1056. WroteRequest: func(httptrace.WroteRequestInfo) {
  1057. trace.wroteRequest = time.Now()
  1058. },
  1059. GotFirstResponseByte: func() {
  1060. trace.firstResponseByte = time.Now()
  1061. },
  1062. }), http.MethodGet, rawURL, nil)
  1063. if err != nil {
  1064. return probeMeasurement{}, err
  1065. }
  1066. if !expectReuse {
  1067. req.Close = true
  1068. }
  1069. resp, err := client.Do(req)
  1070. if err != nil {
  1071. return probeMeasurement{}, err
  1072. }
  1073. defer resp.Body.Close()
  1074. if err = validateResponse(resp); err != nil {
  1075. return probeMeasurement{}, err
  1076. }
  1077. n, err := io.Copy(io.Discard, resp.Body)
  1078. end := time.Now()
  1079. if err != nil {
  1080. return probeMeasurement{}, err
  1081. }
  1082. if expectReuse && !trace.reused {
  1083. return probeMeasurement{}, E.New("self probe did not reuse an existing connection")
  1084. }
  1085. httpStart := trace.wroteRequest
  1086. if httpStart.IsZero() {
  1087. switch {
  1088. case !trace.tlsDone.IsZero():
  1089. httpStart = trace.tlsDone
  1090. case !trace.connectDone.IsZero():
  1091. httpStart = trace.connectDone
  1092. case !trace.gotConn.IsZero():
  1093. httpStart = trace.gotConn
  1094. default:
  1095. httpStart = start
  1096. }
  1097. }
  1098. measurement := probeMeasurement{
  1099. total: end.Sub(start),
  1100. bytes: n,
  1101. reused: trace.reused,
  1102. }
  1103. if !trace.connectStart.IsZero() && !trace.connectDone.IsZero() && trace.connectDone.After(trace.connectStart) {
  1104. measurement.tcp = trace.connectDone.Sub(trace.connectStart)
  1105. }
  1106. if !trace.tlsStart.IsZero() && !trace.tlsDone.IsZero() && trace.tlsDone.After(trace.tlsStart) {
  1107. measurement.tls = trace.tlsDone.Sub(trace.tlsStart)
  1108. if roundTrips := tlsHandshakeRoundTrips(trace.tlsVersion); roundTrips > 1 {
  1109. measurement.tls /= time.Duration(roundTrips)
  1110. }
  1111. }
  1112. if !trace.firstResponseByte.IsZero() && trace.firstResponseByte.After(httpStart) {
  1113. measurement.httpFirst = trace.firstResponseByte.Sub(httpStart)
  1114. }
  1115. if end.After(httpStart) {
  1116. measurement.httpLoaded = end.Sub(httpStart)
  1117. }
  1118. return measurement, nil
  1119. }
  1120. func runDownloadRequest(ctx context.Context, client *http.Client, rawURL string, onActive func()) error {
  1121. req, err := newRequest(ctx, http.MethodGet, rawURL, nil)
  1122. if err != nil {
  1123. return err
  1124. }
  1125. resp, err := client.Do(req)
  1126. if err != nil {
  1127. return err
  1128. }
  1129. defer resp.Body.Close()
  1130. err = validateResponse(resp)
  1131. if err != nil {
  1132. return err
  1133. }
  1134. if onActive != nil {
  1135. onActive()
  1136. }
  1137. _, err = sBufio.Copy(io.Discard, resp.Body)
  1138. if ctx.Err() != nil {
  1139. return nil
  1140. }
  1141. return err
  1142. }
  1143. func runUploadRequest(ctx context.Context, client *http.Client, rawURL string, onActive func()) error {
  1144. body := &uploadBody{
  1145. ctx: ctx,
  1146. onActive: onActive,
  1147. }
  1148. req, err := newRequest(ctx, http.MethodPost, rawURL, body)
  1149. if err != nil {
  1150. return err
  1151. }
  1152. req.ContentLength = -1
  1153. req.Header.Set("Content-Type", "application/octet-stream")
  1154. resp, err := client.Do(req)
  1155. if err != nil {
  1156. if ctx.Err() != nil {
  1157. return nil
  1158. }
  1159. return err
  1160. }
  1161. defer resp.Body.Close()
  1162. err = validateResponse(resp)
  1163. if err != nil {
  1164. return err
  1165. }
  1166. _, _ = io.Copy(io.Discard, resp.Body)
  1167. <-ctx.Done()
  1168. return nil
  1169. }
  1170. func newRequest(ctx context.Context, method string, rawURL string, body io.Reader) (*http.Request, error) {
  1171. req, err := http.NewRequestWithContext(ctx, method, rawURL, body)
  1172. if err != nil {
  1173. return nil, err
  1174. }
  1175. req.Header.Set("Accept-Encoding", "identity")
  1176. return req, nil
  1177. }
  1178. func validateResponse(resp *http.Response) error {
  1179. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  1180. return E.New("unexpected status: ", resp.Status)
  1181. }
  1182. if encoding := resp.Header.Get("Content-Encoding"); encoding != "" {
  1183. return E.New("unexpected content encoding: ", encoding)
  1184. }
  1185. return nil
  1186. }
  1187. func calculateRPM(rounds []probeRound) int32 {
  1188. if len(rounds) == 0 {
  1189. return 0
  1190. }
  1191. var tcpSamples []float64
  1192. var tlsSamples []float64
  1193. var httpFirstSamples []float64
  1194. var httpLoadedSamples []float64
  1195. for _, round := range rounds {
  1196. if round.tcp > 0 {
  1197. tcpSamples = append(tcpSamples, durationMillis(round.tcp))
  1198. }
  1199. if round.tls > 0 {
  1200. tlsSamples = append(tlsSamples, durationMillis(round.tls))
  1201. }
  1202. if round.httpFirst > 0 {
  1203. httpFirstSamples = append(httpFirstSamples, durationMillis(round.httpFirst))
  1204. }
  1205. if round.httpLoaded > 0 {
  1206. httpLoadedSamples = append(httpLoadedSamples, durationMillis(round.httpLoaded))
  1207. }
  1208. }
  1209. httpLoaded := upperTrimmedMean(httpLoadedSamples, settings.trimPercent)
  1210. if httpLoaded <= 0 {
  1211. return 0
  1212. }
  1213. var foreignComponents []float64
  1214. if tcp := upperTrimmedMean(tcpSamples, settings.trimPercent); tcp > 0 {
  1215. foreignComponents = append(foreignComponents, tcp)
  1216. }
  1217. if tls := upperTrimmedMean(tlsSamples, settings.trimPercent); tls > 0 {
  1218. foreignComponents = append(foreignComponents, tls)
  1219. }
  1220. if httpFirst := upperTrimmedMean(httpFirstSamples, settings.trimPercent); httpFirst > 0 {
  1221. foreignComponents = append(foreignComponents, httpFirst)
  1222. }
  1223. if len(foreignComponents) == 0 {
  1224. return 0
  1225. }
  1226. foreignLatency := meanFloat64s(foreignComponents)
  1227. foreignRPM := 60000.0 / foreignLatency
  1228. loadedRPM := 60000.0 / httpLoaded
  1229. return int32(math.Round((foreignRPM + loadedRPM) / 2))
  1230. }
  1231. func tlsHandshakeRoundTrips(version uint16) int {
  1232. switch version {
  1233. case tls.VersionTLS12, tls.VersionTLS11, tls.VersionTLS10:
  1234. return 2
  1235. default:
  1236. return 1
  1237. }
  1238. }
  1239. func durationMillis(value time.Duration) float64 {
  1240. return float64(value) / float64(time.Millisecond)
  1241. }
  1242. func upperTrimmedMean(values []float64, trimPercent int) float64 {
  1243. trimmed := upperTrimFloat64s(values, trimPercent)
  1244. if len(trimmed) == 0 {
  1245. return 0
  1246. }
  1247. return meanFloat64s(trimmed)
  1248. }
  1249. func upperTrimFloat64s(values []float64, trimPercent int) []float64 {
  1250. if len(values) == 0 {
  1251. return nil
  1252. }
  1253. trimmed := append([]float64(nil), values...)
  1254. sort.Float64s(trimmed)
  1255. if trimPercent <= 0 {
  1256. return trimmed
  1257. }
  1258. trimCount := int(math.Floor(float64(len(trimmed)) * float64(trimPercent) / 100))
  1259. if trimCount <= 0 || trimCount >= len(trimmed) {
  1260. return trimmed
  1261. }
  1262. return trimmed[:len(trimmed)-trimCount]
  1263. }
  1264. func meanFloat64s(values []float64) float64 {
  1265. if len(values) == 0 {
  1266. return 0
  1267. }
  1268. var total float64
  1269. for _, value := range values {
  1270. total += value
  1271. }
  1272. return total / float64(len(values))
  1273. }
  1274. func stdDevFloat64s(values []float64) float64 {
  1275. if len(values) == 0 {
  1276. return 0
  1277. }
  1278. mean := meanFloat64s(values)
  1279. var total float64
  1280. for _, value := range values {
  1281. delta := value - mean
  1282. total += delta * delta
  1283. }
  1284. return math.Sqrt(total / float64(len(values)))
  1285. }
  1286. type uploadBody struct {
  1287. ctx context.Context
  1288. activated atomic.Bool
  1289. onActive func()
  1290. }
  1291. func (u *uploadBody) Read(p []byte) (int, error) {
  1292. if err := u.ctx.Err(); err != nil {
  1293. return 0, err
  1294. }
  1295. clear(p)
  1296. n := len(p)
  1297. if n > 0 && u.onActive != nil && u.activated.CompareAndSwap(false, true) {
  1298. u.onActive()
  1299. }
  1300. return n, nil
  1301. }
  1302. func (u *uploadBody) Close() error {
  1303. return nil
  1304. }