aggregate.go 8.1 KB


  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package aggregate
  7. import (
  8. "database/sql"
  9. "fmt"
  10. "log"
  11. "os"
  12. "time"
  13. _ "github.com/lib/pq"
  14. )
  15. type CLI struct {
  16. DBConn string `env:"UR_DB_URL" default:"postgres://user:password@localhost/ur?sslmode=disable"`
  17. }
  18. func (cli *CLI) Run() error {
  19. log.SetFlags(log.Ltime | log.Ldate)
  20. log.SetOutput(os.Stdout)
  21. db, err := sql.Open("postgres", cli.DBConn)
  22. if err != nil {
  23. return fmt.Errorf("database: %w", err)
  24. }
  25. err = setupDB(db)
  26. if err != nil {
  27. return fmt.Errorf("database: %w", err)
  28. }
  29. for {
  30. runAggregation(db)
  31. // Sleep until one minute past next midnight
  32. sleepUntilNext(24*time.Hour, 1*time.Minute)
  33. }
  34. }
  35. func runAggregation(db *sql.DB) {
  36. since := maxIndexedDay(db, "VersionSummary")
  37. log.Println("Aggregating VersionSummary data since", since)
  38. rows, err := aggregateVersionSummary(db, since.Add(24*time.Hour))
  39. if err != nil {
  40. log.Println("aggregate:", err)
  41. }
  42. log.Println("Inserted", rows, "rows")
  43. since = maxIndexedDay(db, "Performance")
  44. log.Println("Aggregating Performance data since", since)
  45. rows, err = aggregatePerformance(db, since.Add(24*time.Hour))
  46. if err != nil {
  47. log.Println("aggregate:", err)
  48. }
  49. log.Println("Inserted", rows, "rows")
  50. since = maxIndexedDay(db, "BlockStats")
  51. log.Println("Aggregating BlockStats data since", since)
  52. rows, err = aggregateBlockStats(db, since.Add(24*time.Hour))
  53. if err != nil {
  54. log.Println("aggregate:", err)
  55. }
  56. log.Println("Inserted", rows, "rows")
  57. }
  58. func sleepUntilNext(intv, margin time.Duration) {
  59. now := time.Now().UTC()
  60. next := now.Truncate(intv).Add(intv).Add(margin)
  61. log.Println("Sleeping until", next)
  62. time.Sleep(next.Sub(now))
  63. }
  64. func setupDB(db *sql.DB) error {
  65. _, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary (
  66. Day TIMESTAMP NOT NULL,
  67. Version VARCHAR(8) NOT NULL,
  68. Count INTEGER NOT NULL
  69. )`)
  70. if err != nil {
  71. return err
  72. }
  73. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS UserMovement (
  74. Day TIMESTAMP NOT NULL,
  75. Added INTEGER NOT NULL,
  76. Bounced INTEGER NOT NULL,
  77. Removed INTEGER NOT NULL
  78. )`)
  79. if err != nil {
  80. return err
  81. }
  82. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Performance (
  83. Day TIMESTAMP NOT NULL,
  84. TotFiles INTEGER NOT NULL,
  85. TotMiB INTEGER NOT NULL,
  86. SHA256Perf DOUBLE PRECISION NOT NULL,
  87. MemorySize INTEGER NOT NULL,
  88. MemoryUsageMiB INTEGER NOT NULL
  89. )`)
  90. if err != nil {
  91. return err
  92. }
  93. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS BlockStats (
  94. Day TIMESTAMP NOT NULL,
  95. Reports INTEGER NOT NULL,
  96. Total BIGINT NOT NULL,
  97. Renamed BIGINT NOT NULL,
  98. Reused BIGINT NOT NULL,
  99. Pulled BIGINT NOT NULL,
  100. CopyOrigin BIGINT NOT NULL,
  101. CopyOriginShifted BIGINT NOT NULL,
  102. CopyElsewhere BIGINT NOT NULL
  103. )`)
  104. if err != nil {
  105. return err
  106. }
  107. var t string
  108. row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`)
  109. if err := row.Scan(&t); err != nil {
  110. _, _ = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`)
  111. }
  112. row = db.QueryRow(`SELECT 'VersionDayIndex'::regclass`)
  113. if err := row.Scan(&t); err != nil {
  114. _, _ = db.Exec(`CREATE INDEX VersionDayIndex ON VersionSummary (Day)`)
  115. }
  116. row = db.QueryRow(`SELECT 'MovementDayIndex'::regclass`)
  117. if err := row.Scan(&t); err != nil {
  118. _, _ = db.Exec(`CREATE INDEX MovementDayIndex ON UserMovement (Day)`)
  119. }
  120. row = db.QueryRow(`SELECT 'PerformanceDayIndex'::regclass`)
  121. if err := row.Scan(&t); err != nil {
  122. _, _ = db.Exec(`CREATE INDEX PerformanceDayIndex ON Performance (Day)`)
  123. }
  124. row = db.QueryRow(`SELECT 'BlockStatsDayIndex'::regclass`)
  125. if err := row.Scan(&t); err != nil {
  126. _, _ = db.Exec(`CREATE INDEX BlockStatsDayIndex ON BlockStats (Day)`)
  127. }
  128. return nil
  129. }
  130. func maxIndexedDay(db *sql.DB, table string) time.Time {
  131. var t time.Time
  132. row := db.QueryRow("SELECT MAX(DATE_TRUNC('day', Day)) FROM " + table)
  133. err := row.Scan(&t)
  134. if err != nil {
  135. return time.Time{}
  136. }
  137. return t
  138. }
  139. func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) {
  140. res, err := db.Exec(`INSERT INTO VersionSummary (
  141. SELECT
  142. DATE_TRUNC('day', Received) AS Day,
  143. SUBSTRING(Report->>'version' FROM '^v\d.\d+') AS Ver,
  144. COUNT(*) AS Count
  145. FROM ReportsJson
  146. WHERE
  147. Received > $1
  148. AND Received < DATE_TRUNC('day', NOW())
  149. AND Report->>'version' like 'v_.%'
  150. GROUP BY Day, Ver
  151. );
  152. `, since)
  153. if err != nil {
  154. return 0, err
  155. }
  156. return res.RowsAffected()
  157. }
  158. func aggregateUserMovement(db *sql.DB) (int64, error) {
  159. rows, err := db.Query(`SELECT
  160. DATE_TRUNC('day', Received) AS Day,
  161. Report->>'uniqueID'
  162. FROM ReportsJson
  163. WHERE
  164. Report->>'uniqueID' IS NOT NULL
  165. AND Received < DATE_TRUNC('day', NOW())
  166. AND Report->>'version' like 'v_.%'
  167. ORDER BY Day
  168. `)
  169. if err != nil {
  170. return 0, err
  171. }
  172. defer rows.Close()
  173. firstSeen := make(map[string]time.Time)
  174. lastSeen := make(map[string]time.Time)
  175. var minTs time.Time
  176. minTs = minTs.In(time.UTC)
  177. for rows.Next() {
  178. var ts time.Time
  179. var id string
  180. if err := rows.Scan(&ts, &id); err != nil {
  181. return 0, err
  182. }
  183. if minTs.IsZero() {
  184. minTs = ts
  185. }
  186. if _, ok := firstSeen[id]; !ok {
  187. firstSeen[id] = ts
  188. }
  189. lastSeen[id] = ts
  190. }
  191. type sumRow struct {
  192. day time.Time
  193. added int
  194. removed int
  195. bounced int
  196. }
  197. var sumRows []sumRow
  198. for t := minTs; t.Before(time.Now().Truncate(24 * time.Hour)); t = t.AddDate(0, 0, 1) {
  199. var added, removed, bounced int
  200. old := t.Before(time.Now().AddDate(0, 0, -30))
  201. for id, first := range firstSeen {
  202. last := lastSeen[id]
  203. if first.Equal(t) && last.Equal(t) && old {
  204. bounced++
  205. continue
  206. }
  207. if first.Equal(t) {
  208. added++
  209. }
  210. if last == t && old {
  211. removed++
  212. }
  213. }
  214. sumRows = append(sumRows, sumRow{t, added, removed, bounced})
  215. }
  216. tx, err := db.Begin()
  217. if err != nil {
  218. return 0, err
  219. }
  220. if _, err := tx.Exec("DELETE FROM UserMovement"); err != nil {
  221. tx.Rollback()
  222. return 0, err
  223. }
  224. for _, r := range sumRows {
  225. if _, err := tx.Exec("INSERT INTO UserMovement (Day, Added, Removed, Bounced) VALUES ($1, $2, $3, $4)", r.day, r.added, r.removed, r.bounced); err != nil {
  226. tx.Rollback()
  227. return 0, err
  228. }
  229. }
  230. return int64(len(sumRows)), tx.Commit()
  231. }
  232. func aggregatePerformance(db *sql.DB, since time.Time) (int64, error) {
  233. res, err := db.Exec(`INSERT INTO Performance (
  234. SELECT
  235. DATE_TRUNC('day', Received) AS Day,
  236. AVG((Report->>'totFiles')::numeric) As TotFiles,
  237. AVG((Report->>'totMiB')::numeric) As TotMiB,
  238. AVG((Report->>'sha256Perf')::numeric) As SHA256Perf,
  239. AVG((Report->>'memorySize')::numeric) As MemorySize,
  240. AVG((Report->>'memoryUsageMiB')::numeric) As MemoryUsageMiB
  241. FROM ReportsJson
  242. WHERE
  243. Received > $1
  244. AND Received < DATE_TRUNC('day', NOW())
  245. AND Report->>'version' like 'v_.%'
  246. /* Some custom implementation reported bytes when we expect megabytes, cap at petabyte */
  247. AND (Report->>'memorySize')::numeric < 1073741824
  248. GROUP BY Day
  249. );
  250. `, since)
  251. if err != nil {
  252. return 0, err
  253. }
  254. return res.RowsAffected()
  255. }
  256. func aggregateBlockStats(db *sql.DB, since time.Time) (int64, error) {
  257. // Filter out anything prior 0.14.41 as that has sum aggregations which
  258. // made no sense.
  259. res, err := db.Exec(`INSERT INTO BlockStats (
  260. SELECT
  261. DATE_TRUNC('day', Received) AS Day,
  262. COUNT(1) As Reports,
  263. SUM((Report->'blockStats'->>'total')::numeric)::bigint AS Total,
  264. SUM((Report->'blockStats'->>'renamed')::numeric)::bigint AS Renamed,
  265. SUM((Report->'blockStats'->>'reused')::numeric)::bigint AS Reused,
  266. SUM((Report->'blockStats'->>'pulled')::numeric)::bigint AS Pulled,
  267. SUM((Report->'blockStats'->>'copyOrigin')::numeric)::bigint AS CopyOrigin,
  268. SUM((Report->'blockStats'->>'copyOriginShifted')::numeric)::bigint AS CopyOriginShifted,
  269. SUM((Report->'blockStats'->>'copyElsewhere')::numeric)::bigint AS CopyElsewhere
  270. FROM ReportsJson
  271. WHERE
  272. Received > $1
  273. AND Received < DATE_TRUNC('day', NOW())
  274. AND (Report->>'urVersion')::numeric >= 3
  275. AND Report->>'version' like 'v_.%'
  276. AND Report->>'version' NOT LIKE 'v0.14.40%'
  277. AND Report->>'version' NOT LIKE 'v0.14.39%'
  278. AND Report->>'version' NOT LIKE 'v0.14.38%'
  279. GROUP BY Day
  280. );
  281. `, since)
  282. if err != nil {
  283. return 0, err
  284. }
  285. return res.RowsAffected()
  286. }