event.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package client
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "net/http"
  7. "reflect"
  8. "strings"
  9. )
  10. var EventMap = map[string]any{
  11. "storage.write": EventStorageWrite{},
  12. "session.updated": EventSessionUpdated{},
  13. "message.updated": EventMessageUpdated{},
  14. }
  15. type EventMessage struct {
  16. Type string `json:"type"`
  17. Properties json.RawMessage `json:"properties"`
  18. }
  19. func (c *Client) Event(ctx context.Context) (<-chan any, error) {
  20. events := make(chan any)
  21. req, err := http.NewRequestWithContext(ctx, "GET", c.Server+"event", nil)
  22. if err != nil {
  23. return nil, err
  24. }
  25. resp, err := http.DefaultClient.Do(req)
  26. if err != nil {
  27. return nil, err
  28. }
  29. go func() {
  30. defer close(events)
  31. defer resp.Body.Close()
  32. scanner := bufio.NewScanner(resp.Body)
  33. for scanner.Scan() {
  34. line := scanner.Text()
  35. if strings.HasPrefix(line, "data: ") {
  36. data := strings.TrimPrefix(line, "data: ")
  37. var eventMsg EventMessage
  38. if err := json.Unmarshal([]byte(data), &eventMsg); err != nil {
  39. continue
  40. }
  41. eventTemplate, exists := EventMap[eventMsg.Type]
  42. if !exists {
  43. continue
  44. }
  45. eventValue := reflect.New(reflect.TypeOf(eventTemplate)).Interface()
  46. if err := json.Unmarshal(eventMsg.Properties, eventValue); err != nil {
  47. continue
  48. }
  49. select {
  50. case events <- eventValue:
  51. case <-ctx.Done():
  52. return
  53. }
  54. }
  55. }
  56. }()
  57. return events, nil
  58. }