websocket.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package core
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/buger/jsonparser"
  11. "github.com/cdle/sillyplus/utils"
  12. "github.com/dop251/goja"
  13. "github.com/gin-gonic/gin"
  14. "github.com/gorilla/websocket"
  15. )
  16. func IsWebSocketRequest(req *http.Request) bool {
  17. if req.Header.Get("Upgrade") != "websocket" {
  18. return false
  19. }
  20. if !websocket.IsWebSocketUpgrade(req) {
  21. return false
  22. }
  23. return true
  24. }
  25. type WsPattern struct {
  26. Value map[string]interface{}
  27. Chan chan map[string]interface{}
  28. }
  29. type WsConn struct {
  30. conn *websocket.Conn
  31. patterns sync.Map
  32. Key int64
  33. sync.RWMutex
  34. }
  35. func (wc *WsConn) Close() error {
  36. return wc.conn.Close()
  37. }
  38. func (wc *WsConn) WriteMessage(messageType int, data []byte, pattern map[string]interface{}) (error, map[string]interface{}) {
  39. var res map[string]interface{}
  40. wp := &WsPattern{}
  41. var timeout int
  42. if pattern != nil {
  43. if v, ok := pattern["$timeout"]; ok {
  44. timeout = utils.Int(v)
  45. delete(pattern, "$timeout")
  46. }
  47. wp.Value = pattern
  48. key := atomic.AddInt64(&wc.Key, 1)
  49. wp.Chan = make(chan map[string]interface{}, 1)
  50. defer func() {
  51. close(wp.Chan)
  52. wc.patterns.Delete(key)
  53. }()
  54. wc.patterns.Store(key, wp)
  55. }
  56. var err error
  57. func() {
  58. wc.Lock()
  59. defer wc.Unlock()
  60. err = wc.conn.WriteMessage(messageType, data)
  61. }()
  62. if pattern != nil {
  63. if timeout == 0 {
  64. timeout = 5000
  65. }
  66. select {
  67. case res = <-wp.Chan:
  68. case <-time.After(time.Millisecond * time.Duration(timeout)):
  69. }
  70. }
  71. return err, res
  72. }
  73. // func (wc *WsConn) WriteMessage(messageType int, data []byte) error {
  74. // return wc.conn.WriteMessage(messageType, data)
  75. // }
  76. func handleWebsocket(c *gin.Context) {
  77. for _, function := range Functions {
  78. if len(function.Https) != 0 {
  79. for _, h := range function.Https {
  80. path := h.Path
  81. method := h.Method
  82. if c.Request.URL.Path == path && strings.HasPrefix(method, "W") {
  83. // connect
  84. var req = &Request{
  85. c: c,
  86. // uuid: uuid,
  87. }
  88. var upGrader = websocket.Upgrader{
  89. CheckOrigin: func(r *http.Request) bool {
  90. return true
  91. },
  92. }
  93. ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
  94. if err != nil {
  95. c.Writer.Write([]byte(err.Error()))
  96. return
  97. }
  98. wc := &WsConn{}
  99. req._event = "connect"
  100. wc.conn = ws
  101. go function.Handle(&CustomSender{
  102. F: &Factory{
  103. botplt: "websocket",
  104. },
  105. }, func(vm *goja.Runtime) {
  106. vm.Set("res", &Response{
  107. c: c,
  108. conn: wc,
  109. vm: vm,
  110. })
  111. vm.Set("req", req)
  112. })
  113. time.Sleep(time.Millisecond * 500)
  114. for {
  115. _, data, err := ws.ReadMessage()
  116. matched := false
  117. wc.patterns.Range(func(key, value any) bool {
  118. wp := value.(*WsPattern)
  119. // fmt.Println("wp.Value", wp.Value)
  120. for k, v := range wp.Value {
  121. value, _, _, err := jsonparser.Get(data, strings.Split(k, ".")...)
  122. // fmt.Println("k, v", k, v, "key path", strings.Split(k, "."), "data:", string(data), err)
  123. if err != nil {
  124. // fmt.Println("err1", err)
  125. return true
  126. }
  127. if string(value) != fmt.Sprint(v) {
  128. return true
  129. }
  130. matched = true
  131. // v2 := v
  132. // fmt.Println("v2 := v", v2, string(value))
  133. // err = json.Unmarshal(value, &v2)
  134. // if err != nil {
  135. // fmt.Println("err2", err)
  136. // return true
  137. // }
  138. // if v != v2 {
  139. // fmt.Println("err3", err)
  140. // return true
  141. // } else {
  142. // matched = true
  143. // }
  144. }
  145. // fmt.Println("matched", matched)
  146. if matched {
  147. var result = map[string]interface{}{}
  148. err := json.Unmarshal(data, &result)
  149. if err == nil {
  150. select {
  151. case wp.Chan <- result:
  152. default:
  153. }
  154. } else {
  155. // fmt.Println("err3", err)
  156. }
  157. return false
  158. }
  159. return true
  160. })
  161. if matched {
  162. continue
  163. }
  164. if err != nil { // disconnect
  165. req._event = "disconnect"
  166. for _, f2 := range Functions {
  167. if f2.UUID == function.UUID {
  168. function = f2
  169. }
  170. }
  171. function.Handle(&CustomSender{
  172. F: &Factory{
  173. botplt: "websocket",
  174. },
  175. }, func(vm *goja.Runtime) {
  176. vm.Set("res", &Response{
  177. c: c,
  178. conn: wc,
  179. vm: vm,
  180. })
  181. vm.Set("req", req)
  182. })
  183. ws.Close()
  184. break
  185. }
  186. req.bodyData = data
  187. req._event = "message"
  188. for _, f2 := range Functions {
  189. if f2.UUID == function.UUID {
  190. function = f2
  191. }
  192. }
  193. go function.Handle(&CustomSender{
  194. F: &Factory{
  195. botplt: "websocket",
  196. },
  197. }, func(vm *goja.Runtime) {
  198. vm.Set("res", &Response{
  199. c: c,
  200. conn: wc,
  201. vm: vm,
  202. })
  203. vm.Set("req", req)
  204. })
  205. }
  206. return
  207. }
  208. }
  209. }
  210. }
  211. }