| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package main
- import (
- "database/sql"
- _ "github.com/lib/pq"
- "log"
- "os"
- "time"
- )
- var dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable")
- func getEnvDefault(key, def string) string {
- if val := os.Getenv(key); val != "" {
- return val
- }
- return def
- }
- func main() {
- log.SetFlags(log.Ltime | log.Ldate)
- log.SetOutput(os.Stdout)
- db, err := sql.Open("postgres", dbConn)
- if err != nil {
- log.Fatalln("database:", err)
- }
- err = setupDB(db)
- if err != nil {
- log.Fatalln("database:", err)
- }
- for {
- runAggregation(db)
- // Sleep until one minute past next midnight
- sleepUntilNext(24*time.Hour, 1*time.Minute)
- }
- }
- func runAggregation(db *sql.DB) {
- since := maxIndexedDay(db)
- log.Println("Aggregating data since", since)
- rows, err := aggregate(db, since)
- if err != nil {
- log.Fatalln("aggregate:", err)
- }
- log.Println("Inserted", rows, "rows")
- }
- func sleepUntilNext(intv, margin time.Duration) {
- now := time.Now().UTC()
- next := now.Truncate(intv).Add(intv).Add(margin)
- log.Println("Sleeping until", next)
- time.Sleep(next.Sub(now))
- }
- func setupDB(db *sql.DB) error {
- _, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary (
- Day TIMESTAMP NOT NULL,
- Version VARCHAR(8) NOT NULL,
- Count INTEGER NOT NULL
- )`)
- if err != nil {
- return err
- }
- row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`)
- if err := row.Scan(nil); err != nil {
- _, err = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`)
- }
- row = db.QueryRow(`SELECT 'DayIndex'::regclass`)
- if err := row.Scan(nil); err != nil {
- _, err = db.Exec(`CREATE INDEX DayIndex ON VerionSummary (Day)`)
- }
- return err
- }
- func maxIndexedDay(db *sql.DB) time.Time {
- var t time.Time
- row := db.QueryRow("SELECT MAX(Day) FROM VersionSummary")
- err := row.Scan(&t)
- if err != nil {
- return time.Time{}
- }
- return t
- }
- func aggregate(db *sql.DB, since time.Time) (int64, error) {
- res, err := db.Exec(`INSERT INTO VersionSummary (
- SELECT
- DATE_TRUNC('day', Received) AS Day,
- SUBSTRING(Version FROM '^v\d.\d+') AS Ver,
- COUNT(*) AS Count
- FROM Reports
- WHERE
- DATE_TRUNC('day', Received) > $1
- AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
- AND Version like 'v0.%'
- GROUP BY Day, Ver
- );
- `, since)
- if err != nil {
- return 0, err
- }
- return res.RowsAffected()
- }
|