main.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package main
  2. import (
  3. "database/sql"
  4. _ "github.com/lib/pq"
  5. "log"
  6. "os"
  7. "time"
  8. )
  9. var dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable")
  10. func getEnvDefault(key, def string) string {
  11. if val := os.Getenv(key); val != "" {
  12. return val
  13. }
  14. return def
  15. }
  16. func main() {
  17. log.SetFlags(log.Ltime | log.Ldate)
  18. log.SetOutput(os.Stdout)
  19. db, err := sql.Open("postgres", dbConn)
  20. if err != nil {
  21. log.Fatalln("database:", err)
  22. }
  23. err = setupDB(db)
  24. if err != nil {
  25. log.Fatalln("database:", err)
  26. }
  27. for {
  28. runAggregation(db)
  29. // Sleep until one minute past next midnight
  30. sleepUntilNext(24*time.Hour, 1*time.Minute)
  31. }
  32. }
  33. func runAggregation(db *sql.DB) {
  34. since := maxIndexedDay(db)
  35. log.Println("Aggregating data since", since)
  36. rows, err := aggregate(db, since)
  37. if err != nil {
  38. log.Fatalln("aggregate:", err)
  39. }
  40. log.Println("Inserted", rows, "rows")
  41. }
  42. func sleepUntilNext(intv, margin time.Duration) {
  43. now := time.Now().UTC()
  44. next := now.Truncate(intv).Add(intv).Add(margin)
  45. log.Println("Sleeping until", next)
  46. time.Sleep(next.Sub(now))
  47. }
  48. func setupDB(db *sql.DB) error {
  49. _, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary (
  50. Day TIMESTAMP NOT NULL,
  51. Version VARCHAR(8) NOT NULL,
  52. Count INTEGER NOT NULL
  53. )`)
  54. if err != nil {
  55. return err
  56. }
  57. row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`)
  58. if err := row.Scan(nil); err != nil {
  59. _, err = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`)
  60. }
  61. row = db.QueryRow(`SELECT 'DayIndex'::regclass`)
  62. if err := row.Scan(nil); err != nil {
  63. _, err = db.Exec(`CREATE INDEX DayIndex ON VerionSummary (Day)`)
  64. }
  65. return err
  66. }
  67. func maxIndexedDay(db *sql.DB) time.Time {
  68. var t time.Time
  69. row := db.QueryRow("SELECT MAX(Day) FROM VersionSummary")
  70. err := row.Scan(&t)
  71. if err != nil {
  72. return time.Time{}
  73. }
  74. return t
  75. }
  76. func aggregate(db *sql.DB, since time.Time) (int64, error) {
  77. res, err := db.Exec(`INSERT INTO VersionSummary (
  78. SELECT
  79. DATE_TRUNC('day', Received) AS Day,
  80. SUBSTRING(Version FROM '^v\d.\d+') AS Ver,
  81. COUNT(*) AS Count
  82. FROM Reports
  83. WHERE
  84. DATE_TRUNC('day', Received) > $1
  85. AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
  86. AND Version like 'v0.%'
  87. GROUP BY Day, Ver
  88. );
  89. `, since)
  90. if err != nil {
  91. return 0, err
  92. }
  93. return res.RowsAffected()
  94. }