main.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package main
  2. import (
  3. "database/sql"
  4. "log"
  5. "os"
  6. "time"
  7. _ "github.com/lib/pq"
  8. )
  9. var dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable")
  10. func getEnvDefault(key, def string) string {
  11. if val := os.Getenv(key); val != "" {
  12. return val
  13. }
  14. return def
  15. }
  16. func main() {
  17. log.SetFlags(log.Ltime | log.Ldate)
  18. log.SetOutput(os.Stdout)
  19. db, err := sql.Open("postgres", dbConn)
  20. if err != nil {
  21. log.Fatalln("database:", err)
  22. }
  23. err = setupDB(db)
  24. if err != nil {
  25. log.Fatalln("database:", err)
  26. }
  27. for {
  28. runAggregation(db)
  29. // Sleep until one minute past next midnight
  30. sleepUntilNext(24*time.Hour, 1*time.Minute)
  31. }
  32. }
  33. func runAggregation(db *sql.DB) {
  34. since := maxIndexedDay(db, "VersionSummary")
  35. log.Println("Aggregating VersionSummary data since", since)
  36. rows, err := aggregateVersionSummary(db, since)
  37. if err != nil {
  38. log.Fatalln("aggregate:", err)
  39. }
  40. log.Println("Inserted", rows, "rows")
  41. log.Println("Aggregating UserMovement data")
  42. rows, err = aggregateUserMovement(db)
  43. if err != nil {
  44. log.Fatalln("aggregate:", err)
  45. }
  46. log.Println("Inserted", rows, "rows")
  47. log.Println("Aggregating Performance data")
  48. since = maxIndexedDay(db, "Performance")
  49. rows, err = aggregatePerformance(db, since)
  50. if err != nil {
  51. log.Fatalln("aggregate:", err)
  52. }
  53. log.Println("Inserted", rows, "rows")
  54. }
  55. func sleepUntilNext(intv, margin time.Duration) {
  56. now := time.Now().UTC()
  57. next := now.Truncate(intv).Add(intv).Add(margin)
  58. log.Println("Sleeping until", next)
  59. time.Sleep(next.Sub(now))
  60. }
  61. func setupDB(db *sql.DB) error {
  62. _, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary (
  63. Day TIMESTAMP NOT NULL,
  64. Version VARCHAR(8) NOT NULL,
  65. Count INTEGER NOT NULL
  66. )`)
  67. if err != nil {
  68. return err
  69. }
  70. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS UserMovement (
  71. Day TIMESTAMP NOT NULL,
  72. Added INTEGER NOT NULL,
  73. Bounced INTEGER NOT NULL,
  74. Removed INTEGER NOT NULL
  75. )`)
  76. if err != nil {
  77. return err
  78. }
  79. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Performance (
  80. Day TIMESTAMP NOT NULL,
  81. TotFiles INTEGER NOT NULL,
  82. TotMiB INTEGER NOT NULL,
  83. SHA256Perf DOUBLE PRECISION NOT NULL,
  84. MemorySize INTEGER NOT NULL,
  85. MemoryUsageMiB INTEGER NOT NULL
  86. )`)
  87. if err != nil {
  88. return err
  89. }
  90. var t string
  91. row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`)
  92. if err := row.Scan(&t); err != nil {
  93. _, err = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`)
  94. }
  95. row = db.QueryRow(`SELECT 'VersionDayIndex'::regclass`)
  96. if err := row.Scan(&t); err != nil {
  97. _, err = db.Exec(`CREATE INDEX VersionDayIndex ON VersionSummary (Day)`)
  98. }
  99. row = db.QueryRow(`SELECT 'MovementDayIndex'::regclass`)
  100. if err := row.Scan(&t); err != nil {
  101. _, err = db.Exec(`CREATE INDEX MovementDayIndex ON UserMovement (Day)`)
  102. }
  103. row = db.QueryRow(`SELECT 'PerformanceDayIndex'::regclass`)
  104. if err := row.Scan(&t); err != nil {
  105. _, err = db.Exec(`CREATE INDEX PerformanceDayIndex ON Performance (Day)`)
  106. }
  107. return err
  108. }
  109. func maxIndexedDay(db *sql.DB, table string) time.Time {
  110. var t time.Time
  111. row := db.QueryRow("SELECT MAX(Day) FROM " + table)
  112. err := row.Scan(&t)
  113. if err != nil {
  114. return time.Time{}
  115. }
  116. return t
  117. }
  118. func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) {
  119. res, err := db.Exec(`INSERT INTO VersionSummary (
  120. SELECT
  121. DATE_TRUNC('day', Received) AS Day,
  122. SUBSTRING(Version FROM '^v\d.\d+') AS Ver,
  123. COUNT(*) AS Count
  124. FROM Reports
  125. WHERE
  126. DATE_TRUNC('day', Received) > $1
  127. AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  128. AND Version like 'v0.%'
  129. GROUP BY Day, Ver
  130. );
  131. `, since)
  132. if err != nil {
  133. return 0, err
  134. }
  135. return res.RowsAffected()
  136. }
  137. func aggregateUserMovement(db *sql.DB) (int64, error) {
  138. rows, err := db.Query(`SELECT
  139. DATE_TRUNC('day', Received) AS Day,
  140. UniqueID
  141. FROM Reports
  142. WHERE
  143. DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  144. AND Version like 'v0.%'
  145. ORDER BY Day
  146. `)
  147. if err != nil {
  148. return 0, err
  149. }
  150. defer rows.Close()
  151. firstSeen := make(map[string]time.Time)
  152. lastSeen := make(map[string]time.Time)
  153. var minTs time.Time
  154. minTs = minTs.In(time.UTC)
  155. for rows.Next() {
  156. var ts time.Time
  157. var id string
  158. if err := rows.Scan(&ts, &id); err != nil {
  159. return 0, err
  160. }
  161. if minTs.IsZero() {
  162. minTs = ts
  163. }
  164. if _, ok := firstSeen[id]; !ok {
  165. firstSeen[id] = ts
  166. }
  167. lastSeen[id] = ts
  168. }
  169. type sumRow struct {
  170. day time.Time
  171. added int
  172. removed int
  173. bounced int
  174. }
  175. var sumRows []sumRow
  176. for t := minTs; t.Before(time.Now().Truncate(24 * time.Hour)); t = t.AddDate(0, 0, 1) {
  177. var added, removed, bounced int
  178. old := t.Before(time.Now().AddDate(0, 0, -14))
  179. for id, first := range firstSeen {
  180. last := lastSeen[id]
  181. if first.Equal(t) && last.Equal(t) && old {
  182. bounced++
  183. continue
  184. }
  185. if first.Equal(t) {
  186. added++
  187. }
  188. if last == t && old {
  189. removed++
  190. }
  191. }
  192. sumRows = append(sumRows, sumRow{t, added, removed, bounced})
  193. }
  194. tx, err := db.Begin()
  195. if err != nil {
  196. return 0, err
  197. }
  198. if _, err := tx.Exec("DELETE FROM UserMovement"); err != nil {
  199. tx.Rollback()
  200. return 0, err
  201. }
  202. for _, r := range sumRows {
  203. if _, err := tx.Exec("INSERT INTO UserMovement (Day, Added, Removed, Bounced) VALUES ($1, $2, $3, $4)", r.day, r.added, r.removed, r.bounced); err != nil {
  204. tx.Rollback()
  205. return 0, err
  206. }
  207. }
  208. return int64(len(sumRows)), tx.Commit()
  209. }
  210. func aggregatePerformance(db *sql.DB, since time.Time) (int64, error) {
  211. res, err := db.Exec(`INSERT INTO Performance (
  212. SELECT
  213. DATE_TRUNC('day', Received) AS Day,
  214. AVG(TotFiles) As TotFiles,
  215. AVG(TotMiB) As TotMiB,
  216. AVG(SHA256Perf) As SHA256Perf,
  217. AVG(MemorySize) As MemorySize,
  218. AVG(MemoryUsageMiB) As MemoryUsageMiB
  219. FROM Reports
  220. WHERE
  221. DATE_TRUNC('day', Received) > $1
  222. AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  223. AND Version like 'v0.%'
  224. GROUP BY Day
  225. );
  226. `, since)
  227. if err != nil {
  228. return 0, err
  229. }
  230. return res.RowsAffected()
  231. }