main.go 4.6 KB


  1. package main
  2. import (
  3. "database/sql"
  4. "log"
  5. "os"
  6. "time"
  7. _ "github.com/lib/pq"
  8. )
  9. var dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable")
  10. func getEnvDefault(key, def string) string {
  11. if val := os.Getenv(key); val != "" {
  12. return val
  13. }
  14. return def
  15. }
  16. func main() {
  17. log.SetFlags(log.Ltime | log.Ldate)
  18. log.SetOutput(os.Stdout)
  19. db, err := sql.Open("postgres", dbConn)
  20. if err != nil {
  21. log.Fatalln("database:", err)
  22. }
  23. err = setupDB(db)
  24. if err != nil {
  25. log.Fatalln("database:", err)
  26. }
  27. for {
  28. runAggregation(db)
  29. // Sleep until one minute past next midnight
  30. sleepUntilNext(24*time.Hour, 1*time.Minute)
  31. }
  32. }
  33. func runAggregation(db *sql.DB) {
  34. since := maxIndexedDay(db, "VersionSummary")
  35. log.Println("Aggregating VersionSummary data since", since)
  36. rows, err := aggregateVersionSummary(db, since)
  37. if err != nil {
  38. log.Fatalln("aggregate:", err)
  39. }
  40. log.Println("Inserted", rows, "rows")
  41. log.Println("Aggregating UserMovement data")
  42. rows, err = aggregateUserMovement(db)
  43. if err != nil {
  44. log.Fatalln("aggregate:", err)
  45. }
  46. log.Println("Inserted", rows, "rows")
  47. }
  48. func sleepUntilNext(intv, margin time.Duration) {
  49. now := time.Now().UTC()
  50. next := now.Truncate(intv).Add(intv).Add(margin)
  51. log.Println("Sleeping until", next)
  52. time.Sleep(next.Sub(now))
  53. }
  54. func setupDB(db *sql.DB) error {
  55. _, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary (
  56. Day TIMESTAMP NOT NULL,
  57. Version VARCHAR(8) NOT NULL,
  58. Count INTEGER NOT NULL
  59. )`)
  60. if err != nil {
  61. return err
  62. }
  63. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS UserMovement (
  64. Day TIMESTAMP NOT NULL,
  65. Added INTEGER NOT NULL,
  66. Bounced INTEGER NOT NULL,
  67. Removed INTEGER NOT NULL
  68. )`)
  69. if err != nil {
  70. return err
  71. }
  72. row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`)
  73. if err := row.Scan(nil); err != nil {
  74. _, err = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`)
  75. }
  76. row = db.QueryRow(`SELECT 'DayIndex'::regclass`)
  77. if err := row.Scan(nil); err != nil {
  78. _, err = db.Exec(`CREATE INDEX DayIndex ON VerionSummary (Day)`)
  79. }
  80. row = db.QueryRow(`SELECT 'MovementDayIndex'::regclass`)
  81. if err := row.Scan(nil); err != nil {
  82. _, err = db.Exec(`CREATE INDEX MovementDayIndex ON UserMovement (Day)`)
  83. }
  84. return err
  85. }
  86. func maxIndexedDay(db *sql.DB, table string) time.Time {
  87. var t time.Time
  88. row := db.QueryRow("SELECT MAX(Day) FROM " + table)
  89. err := row.Scan(&t)
  90. if err != nil {
  91. return time.Time{}
  92. }
  93. return t
  94. }
  95. func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) {
  96. res, err := db.Exec(`INSERT INTO VersionSummary (
  97. SELECT
  98. DATE_TRUNC('day', Received) AS Day,
  99. SUBSTRING(Version FROM '^v\d.\d+') AS Ver,
  100. COUNT(*) AS Count
  101. FROM Reports
  102. WHERE
  103. DATE_TRUNC('day', Received) > $1
  104. AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  105. AND Version like 'v0.%'
  106. GROUP BY Day, Ver
  107. );
  108. `, since)
  109. if err != nil {
  110. return 0, err
  111. }
  112. return res.RowsAffected()
  113. }
  114. func aggregateUserMovement(db *sql.DB) (int64, error) {
  115. rows, err := db.Query(`SELECT
  116. DATE_TRUNC('day', Received) AS Day,
  117. UniqueID
  118. FROM Reports
  119. WHERE
  120. DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  121. AND Version like 'v0.%'
  122. ORDER BY Day
  123. `)
  124. if err != nil {
  125. return 0, err
  126. }
  127. defer rows.Close()
  128. firstSeen := make(map[string]time.Time)
  129. lastSeen := make(map[string]time.Time)
  130. var minTs time.Time
  131. for rows.Next() {
  132. var ts time.Time
  133. var id string
  134. if err := rows.Scan(&ts, &id); err != nil {
  135. return 0, err
  136. }
  137. if minTs.IsZero() {
  138. minTs = ts
  139. }
  140. if _, ok := firstSeen[id]; !ok {
  141. firstSeen[id] = ts
  142. }
  143. lastSeen[id] = ts
  144. }
  145. type sumRow struct {
  146. day time.Time
  147. added int
  148. removed int
  149. bounced int
  150. }
  151. var sumRows []sumRow
  152. for t := minTs; t.Before(time.Now().Truncate(24 * time.Hour)); t = t.AddDate(0, 0, 1) {
  153. var added, removed, bounced int
  154. old := t.Before(time.Now().AddDate(0, 0, -14))
  155. for id, first := range firstSeen {
  156. last := lastSeen[id]
  157. if first.Equal(t) && last.Equal(t) && old {
  158. bounced++
  159. continue
  160. }
  161. if first.Equal(t) {
  162. added++
  163. }
  164. if last == t && old {
  165. removed++
  166. }
  167. }
  168. sumRows = append(sumRows, sumRow{t, added, removed, bounced})
  169. }
  170. tx, err := db.Begin()
  171. if err != nil {
  172. return 0, err
  173. }
  174. if _, err := tx.Exec("DELETE FROM UserMovement"); err != nil {
  175. tx.Rollback()
  176. return 0, err
  177. }
  178. for _, r := range sumRows {
  179. if _, err := tx.Exec("INSERT INTO UserMovement (Day, Added, Removed, Bounced) VALUES ($1, $2, $3, $4)", r.day, r.added, r.removed, r.bounced); err != nil {
  180. tx.Rollback()
  181. return 0, err
  182. }
  183. }
  184. return int64(len(sumRows)), tx.Commit()
  185. }