datastore.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package dashboard
  2. import (
  3. "runtime"
  4. "sync"
  5. "time"
  6. log "github.com/Sirupsen/logrus"
  7. )
  8. const (
  9. // Number of entries to show in top N charts
  10. topClientsSize = 5
  11. // Redux action type names
  12. initMessageType = "INIT"
  13. tickMessageType = "TICK"
  14. )
  15. var (
  16. tickInterval = time.Second * 5
  17. maxWindow = time.Hour * 24
  18. rankingUpdateInterval = time.Hour * 6
  19. uniqueHeloRatioMax = 0.8
  20. maxTicks = int(maxWindow / tickInterval)
  21. nRankingBuffers = int(maxWindow / rankingUpdateInterval)
  22. LogHook = logHook(1)
  23. store = newDataStore()
  24. )
  25. // Keeps track of connection data that is buffered in the topClients
  26. // so the data can be removed after `maxWindow` interval has occurred.
  27. type conn struct {
  28. helo, domain, ip string
  29. }
  30. type dataStore struct {
  31. lock sync.Mutex
  32. // List of samples of RAM usage
  33. ramTicks []point
  34. // List of samples of number of connected clients
  35. nClientTicks []point
  36. // Up-to-date number of clients
  37. nClients uint64
  38. // Total number of clients in the current aggregation buffer
  39. nClientsInBuffer uint64
  40. topDomain bufferedRanking
  41. topHelo bufferedRanking
  42. topIP bufferedRanking
  43. // For notifying the store about new connections
  44. newConns chan conn
  45. subs map[string]chan<- *message
  46. }
  47. func newDataStore() *dataStore {
  48. newConns := make(chan conn, 64)
  49. subs := make(map[string]chan<- *message)
  50. ds := &dataStore{
  51. ramTicks: make([]point, 0, maxTicks),
  52. nClientTicks: make([]point, 0, maxTicks),
  53. topDomain: newBufferedRanking(nRankingBuffers),
  54. topHelo: newBufferedRanking(nRankingBuffers),
  55. topIP: newBufferedRanking(nRankingBuffers),
  56. newConns: newConns,
  57. subs: subs,
  58. }
  59. return ds
  60. }
  61. // Keeps track of top domain/helo/ip rankings, but buffered into multiple
  62. // maps so that old records can be efficiently kept track of and quickly removed
  63. type bufferedRanking []map[string]int
  64. func newBufferedRanking(nBuffers int) bufferedRanking {
  65. br := make([]map[string]int, nBuffers)
  66. for i := 0; i < nBuffers; i++ {
  67. br[i] = make(map[string]int)
  68. }
  69. return br
  70. }
  71. // Manages the list of top clients by domain, helo, and IP by updating buffered
  72. // record maps. At each `rankingUpdateInterval` we shift the maps and remove the
  73. // oldest, so rankings are always at most as old as `maxWindow`
  74. func (ds *dataStore) rankingManager() {
  75. ticker := time.NewTicker(rankingUpdateInterval)
  76. for {
  77. select {
  78. case c := <-ds.newConns:
  79. nHelos := len(ds.topHelo)
  80. if nHelos > 5 &&
  81. float64(nHelos)/float64(ds.nClientsInBuffer) > uniqueHeloRatioMax {
  82. // If too many unique HELO messages are detected as a ratio to the total
  83. // number of clients, quit collecting data until we roll over into the next
  84. // aggregation buffer.
  85. continue
  86. }
  87. ds.lock.Lock()
  88. ds.nClientsInBuffer++
  89. ds.topDomain[0][c.domain]++
  90. ds.topHelo[0][c.helo]++
  91. ds.topIP[0][c.ip]++
  92. ds.lock.Unlock()
  93. case <-ticker.C:
  94. ds.lock.Lock()
  95. // Add empty map at index 0 and shift other maps one down
  96. ds.nClientsInBuffer = 0
  97. ds.topDomain = append(
  98. []map[string]int{map[string]int{}},
  99. ds.topDomain[:len(ds.topDomain)-1]...)
  100. ds.topHelo = append(
  101. []map[string]int{map[string]int{}},
  102. ds.topHelo[:len(ds.topHelo)-1]...)
  103. ds.topIP = append(
  104. []map[string]int{map[string]int{}},
  105. ds.topHelo[:len(ds.topIP)-1]...)
  106. ds.lock.Unlock()
  107. case <-stopRankingManager:
  108. return
  109. }
  110. }
  111. }
  112. // Aggregates the rankings from the ranking buffer into a single map
  113. // for each of domain, helo, ip. This is what we send to the frontend.
  114. func (ds *dataStore) aggregateRankings() ranking {
  115. topDomain := make(map[string]int, len(ds.topDomain[0]))
  116. topHelo := make(map[string]int, len(ds.topHelo[0]))
  117. topIP := make(map[string]int, len(ds.topIP[0]))
  118. ds.lock.Lock()
  119. // Aggregate buffers
  120. for i := 0; i < nRankingBuffers; i++ {
  121. for domain, count := range ds.topDomain[i] {
  122. topDomain[domain] += count
  123. }
  124. for helo, count := range ds.topHelo[i] {
  125. topHelo[helo] += count
  126. }
  127. for ip, count := range ds.topIP[i] {
  128. topIP[ip] += count
  129. }
  130. }
  131. ds.lock.Unlock()
  132. return ranking{
  133. TopDomain: topDomain,
  134. TopHelo: topHelo,
  135. TopIP: topIP,
  136. }
  137. }
  138. // Adds a new ram point, removing old points if necessary
  139. func (ds *dataStore) addRAMPoint(p point) {
  140. if len(ds.ramTicks) == int(maxTicks) {
  141. ds.ramTicks = append(ds.ramTicks[1:], p)
  142. } else {
  143. ds.ramTicks = append(ds.ramTicks, p)
  144. }
  145. }
  146. // Adds a new nClients point, removing old points if necessary
  147. func (ds *dataStore) addNClientPoint(p point) {
  148. if len(ds.nClientTicks) == int(maxTicks) {
  149. ds.nClientTicks = append(ds.nClientTicks[1:], p)
  150. } else {
  151. ds.nClientTicks = append(ds.nClientTicks, p)
  152. }
  153. }
  154. func (ds *dataStore) subscribe(id string, c chan<- *message) {
  155. ds.subs[id] = c
  156. }
  157. func (ds *dataStore) unsubscribe(id string) {
  158. delete(ds.subs, id)
  159. }
  160. func (ds *dataStore) notify(m *message) {
  161. // Prevent concurrent read/write to maps in the store
  162. ds.lock.Lock()
  163. defer ds.lock.Unlock()
  164. for _, c := range ds.subs {
  165. select {
  166. case c <- m:
  167. default:
  168. }
  169. }
  170. }
  171. // Initiates a session with all historic data in the store
  172. func (ds *dataStore) initSession(sess *session) {
  173. store.subs[sess.id] <- &message{initMessageType, initFrame{
  174. Ram: store.ramTicks,
  175. NClients: store.nClientTicks,
  176. }}
  177. }
  178. type point struct {
  179. X time.Time `json:"x"`
  180. Y uint64 `json:"y"`
  181. }
  182. // Measures RAM and number of connected clients and sends a tick
  183. // message to all connected clients on the given interval
  184. func dataListener(interval time.Duration) {
  185. ticker := time.Tick(interval)
  186. memStats := &runtime.MemStats{}
  187. for {
  188. select {
  189. case t := <-ticker:
  190. runtime.ReadMemStats(memStats)
  191. ramPoint := point{t, memStats.Alloc}
  192. nClientPoint := point{t, store.nClients}
  193. log.WithFields(map[string]interface{}{
  194. "ram": ramPoint.Y,
  195. "clients": nClientPoint.Y,
  196. }).Info("Logging analytics data")
  197. store.addRAMPoint(ramPoint)
  198. store.addNClientPoint(nClientPoint)
  199. store.notify(&message{tickMessageType, dataFrame{
  200. Ram: ramPoint,
  201. NClients: nClientPoint,
  202. ranking: store.aggregateRankings(),
  203. }})
  204. case <-stopDataListener:
  205. return
  206. }
  207. }
  208. }
  209. // Keeps track of top clients by helo, ip, and domain
  210. type ranking struct {
  211. TopHelo map[string]int `json:"topHelo"`
  212. TopIP map[string]int `json:"topIP"`
  213. TopDomain map[string]int `json:"topDomain"`
  214. }
  215. type dataFrame struct {
  216. Ram point `json:"ram"`
  217. NClients point `json:"nClients"`
  218. ranking
  219. }
  220. type initFrame struct {
  221. Ram []point `json:"ram"`
  222. NClients []point `json:"nClients"`
  223. ranking
  224. }
  225. // Format of messages to be sent over WebSocket
  226. type message struct {
  227. Type string `json:"type"`
  228. Payload interface{} `json:"payload"`
  229. }
  230. type logHook int
  231. func (h logHook) Levels() []log.Level {
  232. return log.AllLevels
  233. }
  234. // Checks fired logs for information that is relevant to the dashboard
  235. func (h logHook) Fire(e *log.Entry) error {
  236. event, ok := e.Data["event"].(string)
  237. if !ok {
  238. return nil
  239. }
  240. var helo, ip, domain string
  241. if event == "mailfrom" {
  242. helo, ok = e.Data["helo"].(string)
  243. if !ok {
  244. return nil
  245. }
  246. if len(helo) > 16 {
  247. helo = helo[:16]
  248. }
  249. ip, ok = e.Data["address"].(string)
  250. if !ok {
  251. return nil
  252. }
  253. domain, ok = e.Data["domain"].(string)
  254. if !ok {
  255. return nil
  256. }
  257. }
  258. switch event {
  259. case "connect":
  260. store.lock.Lock()
  261. store.nClients++
  262. store.lock.Unlock()
  263. case "mailfrom":
  264. store.newConns <- conn{
  265. domain: domain,
  266. helo: helo,
  267. ip: ip,
  268. }
  269. case "disconnect":
  270. log.Infof("disconnect in dashboard, nclients: %d", store.nClients)
  271. store.lock.Lock()
  272. store.nClients--
  273. store.lock.Unlock()
  274. }
  275. return nil
  276. }