event.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package client
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "net/http"
  7. "reflect"
  8. "strings"
  9. )
  10. var EventMap = map[string]any{
  11. "storage.write": StorageWrite{},
  12. }
  13. type EventMessage struct {
  14. Type string `json:"type"`
  15. Properties json.RawMessage `json:"properties"`
  16. }
  17. func (c *Client) Event(ctx context.Context) (<-chan any, error) {
  18. events := make(chan any)
  19. req, err := http.NewRequestWithContext(ctx, "GET", c.Server+"event", nil)
  20. if err != nil {
  21. return nil, err
  22. }
  23. resp, err := http.DefaultClient.Do(req)
  24. if err != nil {
  25. return nil, err
  26. }
  27. go func() {
  28. defer close(events)
  29. defer resp.Body.Close()
  30. scanner := bufio.NewScanner(resp.Body)
  31. for scanner.Scan() {
  32. line := scanner.Text()
  33. if strings.HasPrefix(line, "data: ") {
  34. data := strings.TrimPrefix(line, "data: ")
  35. var eventMsg EventMessage
  36. if err := json.Unmarshal([]byte(data), &eventMsg); err != nil {
  37. continue
  38. }
  39. eventTemplate, exists := EventMap[eventMsg.Type]
  40. if !exists {
  41. continue
  42. }
  43. eventValue := reflect.New(reflect.TypeOf(eventTemplate)).Interface()
  44. if err := json.Unmarshal(eventMsg.Properties, eventValue); err != nil {
  45. continue
  46. }
  47. select {
  48. case events <- eventValue:
  49. case <-ctx.Done():
  50. return
  51. }
  52. }
  53. }
  54. }()
  55. return events, nil
  56. }