main.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package main
  2. import (
  3. "database/sql"
  4. "log"
  5. "os"
  6. "time"
  7. _ "github.com/lib/pq"
  8. )
  9. var dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable")
  10. func getEnvDefault(key, def string) string {
  11. if val := os.Getenv(key); val != "" {
  12. return val
  13. }
  14. return def
  15. }
  16. func main() {
  17. log.SetFlags(log.Ltime | log.Ldate)
  18. log.SetOutput(os.Stdout)
  19. db, err := sql.Open("postgres", dbConn)
  20. if err != nil {
  21. log.Fatalln("database:", err)
  22. }
  23. err = setupDB(db)
  24. if err != nil {
  25. log.Fatalln("database:", err)
  26. }
  27. for {
  28. runAggregation(db)
  29. // Sleep until one minute past next midnight
  30. sleepUntilNext(24*time.Hour, 1*time.Minute)
  31. }
  32. }
  33. func runAggregation(db *sql.DB) {
  34. since := maxIndexedDay(db, "VersionSummary")
  35. log.Println("Aggregating VersionSummary data since", since)
  36. rows, err := aggregateVersionSummary(db, since)
  37. if err != nil {
  38. log.Fatalln("aggregate:", err)
  39. }
  40. log.Println("Inserted", rows, "rows")
  41. log.Println("Aggregating UserMovement data")
  42. rows, err = aggregateUserMovement(db)
  43. if err != nil {
  44. log.Fatalln("aggregate:", err)
  45. }
  46. log.Println("Inserted", rows, "rows")
  47. log.Println("Aggregating Performance data")
  48. since = maxIndexedDay(db, "Performance")
  49. rows, err = aggregatePerformance(db, since)
  50. if err != nil {
  51. log.Fatalln("aggregate:", err)
  52. }
  53. log.Println("Inserted", rows, "rows")
  54. log.Println("Aggregating BlockStats data")
  55. since = maxIndexedDay(db, "BlockStats")
  56. rows, err = aggregateBlockStats(db, since)
  57. if err != nil {
  58. log.Fatalln("aggregate:", err)
  59. }
  60. log.Println("Inserted", rows, "rows")
  61. }
  62. func sleepUntilNext(intv, margin time.Duration) {
  63. now := time.Now().UTC()
  64. next := now.Truncate(intv).Add(intv).Add(margin)
  65. log.Println("Sleeping until", next)
  66. time.Sleep(next.Sub(now))
  67. }
  68. func setupDB(db *sql.DB) error {
  69. _, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary (
  70. Day TIMESTAMP NOT NULL,
  71. Version VARCHAR(8) NOT NULL,
  72. Count INTEGER NOT NULL
  73. )`)
  74. if err != nil {
  75. return err
  76. }
  77. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS UserMovement (
  78. Day TIMESTAMP NOT NULL,
  79. Added INTEGER NOT NULL,
  80. Bounced INTEGER NOT NULL,
  81. Removed INTEGER NOT NULL
  82. )`)
  83. if err != nil {
  84. return err
  85. }
  86. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Performance (
  87. Day TIMESTAMP NOT NULL,
  88. TotFiles INTEGER NOT NULL,
  89. TotMiB INTEGER NOT NULL,
  90. SHA256Perf DOUBLE PRECISION NOT NULL,
  91. MemorySize INTEGER NOT NULL,
  92. MemoryUsageMiB INTEGER NOT NULL
  93. )`)
  94. if err != nil {
  95. return err
  96. }
  97. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS BlockStats (
  98. Day TIMESTAMP NOT NULL,
  99. Reports INTEGER NOT NULL,
  100. Total INTEGER NOT NULL,
  101. Renamed INTEGER NOT NULL,
  102. Reused INTEGER NOT NULL,
  103. Pulled INTEGER NOT NULL,
  104. CopyOrigin INTEGER NOT NULL,
  105. CopyOriginShifted INTEGER NOT NULL,
  106. CopyElsewhere INTEGER NOT NULL
  107. )`)
  108. if err != nil {
  109. return err
  110. }
  111. var t string
  112. row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`)
  113. if err := row.Scan(&t); err != nil {
  114. _, err = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`)
  115. }
  116. row = db.QueryRow(`SELECT 'VersionDayIndex'::regclass`)
  117. if err := row.Scan(&t); err != nil {
  118. _, err = db.Exec(`CREATE INDEX VersionDayIndex ON VersionSummary (Day)`)
  119. }
  120. row = db.QueryRow(`SELECT 'MovementDayIndex'::regclass`)
  121. if err := row.Scan(&t); err != nil {
  122. _, err = db.Exec(`CREATE INDEX MovementDayIndex ON UserMovement (Day)`)
  123. }
  124. row = db.QueryRow(`SELECT 'PerformanceDayIndex'::regclass`)
  125. if err := row.Scan(&t); err != nil {
  126. _, err = db.Exec(`CREATE INDEX PerformanceDayIndex ON Performance (Day)`)
  127. }
  128. row = db.QueryRow(`SELECT 'BlockStatsDayIndex'::regclass`)
  129. if err := row.Scan(&t); err != nil {
  130. _, err = db.Exec(`CREATE INDEX BlockStatsDayIndex ON BlockStats (Day)`)
  131. }
  132. return err
  133. }
  134. func maxIndexedDay(db *sql.DB, table string) time.Time {
  135. var t time.Time
  136. row := db.QueryRow("SELECT MAX(Day) FROM " + table)
  137. err := row.Scan(&t)
  138. if err != nil {
  139. return time.Time{}
  140. }
  141. return t
  142. }
  143. func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) {
  144. res, err := db.Exec(`INSERT INTO VersionSummary (
  145. SELECT
  146. DATE_TRUNC('day', Received) AS Day,
  147. SUBSTRING(Version FROM '^v\d.\d+') AS Ver,
  148. COUNT(*) AS Count
  149. FROM Reports
  150. WHERE
  151. DATE_TRUNC('day', Received) > $1
  152. AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  153. AND Version like 'v0.%'
  154. GROUP BY Day, Ver
  155. );
  156. `, since)
  157. if err != nil {
  158. return 0, err
  159. }
  160. return res.RowsAffected()
  161. }
  162. func aggregateUserMovement(db *sql.DB) (int64, error) {
  163. rows, err := db.Query(`SELECT
  164. DATE_TRUNC('day', Received) AS Day,
  165. UniqueID
  166. FROM Reports
  167. WHERE
  168. DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  169. AND Version like 'v0.%'
  170. ORDER BY Day
  171. `)
  172. if err != nil {
  173. return 0, err
  174. }
  175. defer rows.Close()
  176. firstSeen := make(map[string]time.Time)
  177. lastSeen := make(map[string]time.Time)
  178. var minTs time.Time
  179. minTs = minTs.In(time.UTC)
  180. for rows.Next() {
  181. var ts time.Time
  182. var id string
  183. if err := rows.Scan(&ts, &id); err != nil {
  184. return 0, err
  185. }
  186. if minTs.IsZero() {
  187. minTs = ts
  188. }
  189. if _, ok := firstSeen[id]; !ok {
  190. firstSeen[id] = ts
  191. }
  192. lastSeen[id] = ts
  193. }
  194. type sumRow struct {
  195. day time.Time
  196. added int
  197. removed int
  198. bounced int
  199. }
  200. var sumRows []sumRow
  201. for t := minTs; t.Before(time.Now().Truncate(24 * time.Hour)); t = t.AddDate(0, 0, 1) {
  202. var added, removed, bounced int
  203. old := t.Before(time.Now().AddDate(0, 0, -30))
  204. for id, first := range firstSeen {
  205. last := lastSeen[id]
  206. if first.Equal(t) && last.Equal(t) && old {
  207. bounced++
  208. continue
  209. }
  210. if first.Equal(t) {
  211. added++
  212. }
  213. if last == t && old {
  214. removed++
  215. }
  216. }
  217. sumRows = append(sumRows, sumRow{t, added, removed, bounced})
  218. }
  219. tx, err := db.Begin()
  220. if err != nil {
  221. return 0, err
  222. }
  223. if _, err := tx.Exec("DELETE FROM UserMovement"); err != nil {
  224. tx.Rollback()
  225. return 0, err
  226. }
  227. for _, r := range sumRows {
  228. if _, err := tx.Exec("INSERT INTO UserMovement (Day, Added, Removed, Bounced) VALUES ($1, $2, $3, $4)", r.day, r.added, r.removed, r.bounced); err != nil {
  229. tx.Rollback()
  230. return 0, err
  231. }
  232. }
  233. return int64(len(sumRows)), tx.Commit()
  234. }
  235. func aggregatePerformance(db *sql.DB, since time.Time) (int64, error) {
  236. res, err := db.Exec(`INSERT INTO Performance (
  237. SELECT
  238. DATE_TRUNC('day', Received) AS Day,
  239. AVG(TotFiles) As TotFiles,
  240. AVG(TotMiB) As TotMiB,
  241. AVG(SHA256Perf) As SHA256Perf,
  242. AVG(MemorySize) As MemorySize,
  243. AVG(MemoryUsageMiB) As MemoryUsageMiB
  244. FROM Reports
  245. WHERE
  246. DATE_TRUNC('day', Received) > $1
  247. AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  248. AND Version like 'v0.%'
  249. GROUP BY Day
  250. );
  251. `, since)
  252. if err != nil {
  253. return 0, err
  254. }
  255. return res.RowsAffected()
  256. }
  257. func aggregateBlockStats(db *sql.DB, since time.Time) (int64, error) {
  258. // Filter out anything prior 0.14.41 as that has sum aggregations which
  259. // made no sense.
  260. res, err := db.Exec(`INSERT INTO BlockStats (
  261. SELECT
  262. DATE_TRUNC('day', Received) AS Day,
  263. COUNT(1) As Reports,
  264. SUM(BlocksTotal) AS Total,
  265. SUM(BlocksRenamed) AS Renamed,
  266. SUM(BlocksReused) AS Reused,
  267. SUM(BlocksPulled) AS Pulled,
  268. SUM(BlocksCopyOrigin) AS CopyOrigin,
  269. SUM(BlocksCopyOriginShifted) AS CopyOriginShifted,
  270. SUM(BlocksCopyElsewhere) AS CopyElsewhere
  271. FROM Reports
  272. WHERE
  273. DATE_TRUNC('day', Received) > $1
  274. AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  275. AND ReportVersion = 3
  276. AND Version LIKE 'v0.%'
  277. AND Version NOT LIKE 'v0.14.40%'
  278. AND Version NOT LIKE 'v0.14.39%'
  279. AND Version NOT LIKE 'v0.14.38%'
  280. GROUP BY Day
  281. );
  282. `, since)
  283. if err != nil {
  284. return 0, err
  285. }
  286. return res.RowsAffected()
  287. }