main.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package main
  2. import (
  3. "database/sql"
  4. "log"
  5. "os"
  6. "time"
  7. _ "github.com/lib/pq"
  8. )
  9. var dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable")
  10. func getEnvDefault(key, def string) string {
  11. if val := os.Getenv(key); val != "" {
  12. return val
  13. }
  14. return def
  15. }
  16. func main() {
  17. log.SetFlags(log.Ltime | log.Ldate)
  18. log.SetOutput(os.Stdout)
  19. db, err := sql.Open("postgres", dbConn)
  20. if err != nil {
  21. log.Fatalln("database:", err)
  22. }
  23. err = setupDB(db)
  24. if err != nil {
  25. log.Fatalln("database:", err)
  26. }
  27. for {
  28. runAggregation(db)
  29. // Sleep until one minute past next midnight
  30. sleepUntilNext(24*time.Hour, 1*time.Minute)
  31. }
  32. }
  33. func runAggregation(db *sql.DB) {
  34. since := maxIndexedDay(db, "VersionSummary")
  35. log.Println("Aggregating VersionSummary data since", since)
  36. rows, err := aggregateVersionSummary(db, since)
  37. if err != nil {
  38. log.Fatalln("aggregate:", err)
  39. }
  40. log.Println("Inserted", rows, "rows")
  41. log.Println("Aggregating UserMovement data")
  42. rows, err = aggregateUserMovement(db)
  43. if err != nil {
  44. log.Fatalln("aggregate:", err)
  45. }
  46. log.Println("Inserted", rows, "rows")
  47. }
  48. func sleepUntilNext(intv, margin time.Duration) {
  49. now := time.Now().UTC()
  50. next := now.Truncate(intv).Add(intv).Add(margin)
  51. log.Println("Sleeping until", next)
  52. time.Sleep(next.Sub(now))
  53. }
  54. func setupDB(db *sql.DB) error {
  55. _, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary (
  56. Day TIMESTAMP NOT NULL,
  57. Version VARCHAR(8) NOT NULL,
  58. Count INTEGER NOT NULL
  59. )`)
  60. if err != nil {
  61. return err
  62. }
  63. _, err = db.Exec(`CREATE TABLE IF NOT EXISTS UserMovement (
  64. Day TIMESTAMP NOT NULL,
  65. Added INTEGER NOT NULL,
  66. Removed INTEGER NOT NULL
  67. )`)
  68. if err != nil {
  69. return err
  70. }
  71. row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`)
  72. if err := row.Scan(nil); err != nil {
  73. _, err = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`)
  74. }
  75. row = db.QueryRow(`SELECT 'DayIndex'::regclass`)
  76. if err := row.Scan(nil); err != nil {
  77. _, err = db.Exec(`CREATE INDEX DayIndex ON VerionSummary (Day)`)
  78. }
  79. row = db.QueryRow(`SELECT 'MovementDayIndex'::regclass`)
  80. if err := row.Scan(nil); err != nil {
  81. _, err = db.Exec(`CREATE INDEX MovementDayIndex ON UserMovement (Day)`)
  82. }
  83. return err
  84. }
  85. func maxIndexedDay(db *sql.DB, table string) time.Time {
  86. var t time.Time
  87. row := db.QueryRow("SELECT MAX(Day) FROM " + table)
  88. err := row.Scan(&t)
  89. if err != nil {
  90. return time.Time{}
  91. }
  92. return t
  93. }
  94. func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) {
  95. res, err := db.Exec(`INSERT INTO VersionSummary (
  96. SELECT
  97. DATE_TRUNC('day', Received) AS Day,
  98. SUBSTRING(Version FROM '^v\d.\d+') AS Ver,
  99. COUNT(*) AS Count
  100. FROM Reports
  101. WHERE
  102. DATE_TRUNC('day', Received) > $1
  103. AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  104. AND Version like 'v0.%'
  105. GROUP BY Day, Ver
  106. );
  107. `, since)
  108. if err != nil {
  109. return 0, err
  110. }
  111. return res.RowsAffected()
  112. }
  113. func aggregateUserMovement(db *sql.DB) (int64, error) {
  114. rows, err := db.Query(`SELECT
  115. DATE_TRUNC('day', Received) AS Day,
  116. UniqueID
  117. FROM Reports
  118. WHERE
  119. DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  120. AND Version like 'v0.%'
  121. ORDER BY Day
  122. `)
  123. if err != nil {
  124. return 0, err
  125. }
  126. defer rows.Close()
  127. firstSeen := make(map[string]time.Time)
  128. lastSeen := make(map[string]time.Time)
  129. var minTs time.Time
  130. for rows.Next() {
  131. var ts time.Time
  132. var id string
  133. if err := rows.Scan(&ts, &id); err != nil {
  134. return 0, err
  135. }
  136. if minTs.IsZero() {
  137. minTs = ts
  138. }
  139. if _, ok := firstSeen[id]; !ok {
  140. firstSeen[id] = ts
  141. }
  142. lastSeen[id] = ts
  143. }
  144. type sumRow struct {
  145. day time.Time
  146. added int
  147. removed int
  148. }
  149. var sumRows []sumRow
  150. for t := minTs; t.Before(time.Now().Truncate(24 * time.Hour)); t = t.AddDate(0, 0, 1) {
  151. var added, removed int
  152. for id, first := range firstSeen {
  153. last := lastSeen[id]
  154. if first.Equal(t) {
  155. added++
  156. }
  157. if last == t && t.Before(time.Now().AddDate(0, 0, -14)) {
  158. removed++
  159. }
  160. }
  161. sumRows = append(sumRows, sumRow{t, added, removed})
  162. }
  163. tx, err := db.Begin()
  164. if err != nil {
  165. return 0, err
  166. }
  167. if _, err := tx.Exec("DELETE FROM UserMovement"); err != nil {
  168. tx.Rollback()
  169. return 0, err
  170. }
  171. for _, r := range sumRows {
  172. if _, err := tx.Exec("INSERT INTO UserMovement (Day, Added, Removed) VALUES ($1, $2, $3)", r.day, r.added, r.removed); err != nil {
  173. tx.Rollback()
  174. return 0, err
  175. }
  176. }
  177. return int64(len(sumRows)), tx.Commit()
  178. }