cache.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. package cache
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "net/http"
  10. "strconv"
  11. "sync"
  12. "time"
  13. "github.com/bytedance/sonic"
  14. "github.com/gin-gonic/gin"
  15. "github.com/labring/aiproxy/core/common"
  16. "github.com/labring/aiproxy/core/model"
  17. "github.com/labring/aiproxy/core/relay/adaptor"
  18. "github.com/labring/aiproxy/core/relay/meta"
  19. "github.com/labring/aiproxy/core/relay/plugin"
  20. "github.com/labring/aiproxy/core/relay/plugin/noop"
  21. gcache "github.com/patrickmn/go-cache"
  22. "github.com/redis/go-redis/v9"
  23. )
  24. // Constants for cache metadata keys
  25. const (
  26. cacheKey = "cache_key"
  27. cacheHit = "cache_hit"
  28. cacheValue = "cache_value"
  29. )
  30. // Constants for plugin configuration
  31. const (
  32. pluginConfigCacheKey = "cache-config"
  33. cacheHeader = "X-Aiproxy-Cache"
  34. redisCachePrefix = "cache:"
  35. )
  36. // Buffer size constants
  37. const (
  38. defaultBufferSize = 512 * 1024
  39. maxBufferSize = 4 * defaultBufferSize
  40. )
  41. // Item represents a cached response
  42. type Item struct {
  43. Body []byte `json:"body"`
  44. Header map[string][]string `json:"header"`
  45. Usage model.Usage `json:"usage"`
  46. }
  47. // Cache implements caching functionality for AI requests
  48. type Cache struct {
  49. noop.Noop
  50. rdb *redis.Client
  51. }
  52. var (
  53. _ plugin.Plugin = (*Cache)(nil)
  54. // Global cache instance with 5 minute default TTL and 10 minute cleanup interval
  55. cache = gcache.New(30*time.Second, 5*time.Minute)
  56. // Buffer pool for response writers
  57. bufferPool = sync.Pool{
  58. New: func() any {
  59. return bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
  60. },
  61. }
  62. )
  63. // NewCachePlugin creates a new cache plugin
  64. func NewCachePlugin(rdb *redis.Client) plugin.Plugin {
  65. return &Cache{rdb: rdb}
  66. }
  67. // Cache metadata helpers
  68. func getCacheKey(meta *meta.Meta) string {
  69. return meta.GetString(cacheKey)
  70. }
  71. func setCacheKey(meta *meta.Meta, key string) {
  72. meta.Set(cacheKey, key)
  73. }
  74. func isCacheHit(meta *meta.Meta) bool {
  75. return meta.GetBool(cacheHit)
  76. }
  77. func getCacheItem(meta *meta.Meta) *Item {
  78. v, ok := meta.Get(cacheValue)
  79. if !ok {
  80. return nil
  81. }
  82. item, ok := v.(*Item)
  83. if !ok {
  84. panic(fmt.Sprintf("cache item type not match: %T", v))
  85. }
  86. return item
  87. }
  88. func setCacheHit(meta *meta.Meta, item *Item) {
  89. meta.Set(cacheHit, true)
  90. meta.Set(cacheValue, item)
  91. }
  92. // Buffer pool helpers
  93. func getBuffer() *bytes.Buffer {
  94. v, ok := bufferPool.Get().(*bytes.Buffer)
  95. if !ok {
  96. panic(fmt.Sprintf("buffer type error: %T, %v", v, v))
  97. }
  98. return v
  99. }
  100. func putBuffer(buf *bytes.Buffer) {
  101. buf.Reset()
  102. if buf.Cap() > maxBufferSize {
  103. return
  104. }
  105. bufferPool.Put(buf)
  106. }
  107. // getPluginConfig retrieves the plugin configuration from metadata
  108. func getPluginConfig(meta *meta.Meta) (config *Config, err error) {
  109. v, ok := meta.Get(pluginConfigCacheKey)
  110. if ok {
  111. config, ok := v.(*Config)
  112. if !ok {
  113. panic(fmt.Sprintf("cache config type not match: %T", v))
  114. }
  115. return config, nil
  116. }
  117. pluginConfig := Config{}
  118. if err := meta.ModelConfig.LoadPluginConfig("cache", &pluginConfig); err != nil {
  119. return nil, err
  120. }
  121. meta.Set(pluginConfigCacheKey, &pluginConfig)
  122. return &pluginConfig, nil
  123. }
  124. // Redis cache operations
  125. func (c *Cache) getFromRedis(ctx context.Context, key string) (*Item, error) {
  126. if c.rdb == nil {
  127. return nil, nil
  128. }
  129. data, err := c.rdb.Get(ctx, redisCachePrefix+key).Bytes()
  130. if err != nil {
  131. if errors.Is(err, redis.Nil) {
  132. return nil, nil
  133. }
  134. return nil, err
  135. }
  136. var item Item
  137. if err := sonic.Unmarshal(data, &item); err != nil {
  138. return nil, err
  139. }
  140. return &item, nil
  141. }
  142. func (c *Cache) setToRedis(ctx context.Context, key string, item *Item, ttl time.Duration) error {
  143. if c.rdb == nil {
  144. return nil
  145. }
  146. data, err := sonic.Marshal(item)
  147. if err != nil {
  148. return err
  149. }
  150. return c.rdb.Set(ctx, redisCachePrefix+key, data, ttl).Err()
  151. }
  152. // getFromCache retrieves item from cache (Redis or memory)
  153. func (c *Cache) getFromCache(ctx context.Context, key string) (*Item, bool) {
  154. // Try Redis first if available
  155. if c.rdb != nil {
  156. item, err := c.getFromRedis(ctx, key)
  157. if err == nil && item != nil {
  158. return item, true
  159. }
  160. // If Redis fails, fallback to memory cache
  161. }
  162. // Try memory cache
  163. if v, ok := cache.Get(key); ok {
  164. if item, ok := v.(Item); ok {
  165. return &item, true
  166. }
  167. }
  168. return nil, false
  169. }
  170. // setToCache stores item in cache (Redis and/or memory)
  171. func (c *Cache) setToCache(ctx context.Context, key string, item Item, ttl time.Duration) {
  172. // Set to Redis if available
  173. if c.rdb != nil {
  174. if err := c.setToRedis(ctx, key, &item, ttl); err == nil {
  175. // If Redis succeeds, also set to memory cache for faster access
  176. cache.Set(key, item, ttl)
  177. return
  178. }
  179. // If Redis fails, fallback to memory cache only
  180. }
  181. // Set to memory cache
  182. cache.Set(key, item, ttl)
  183. }
  184. // ConvertRequest handles the request conversion phase
  185. func (c *Cache) ConvertRequest(
  186. meta *meta.Meta,
  187. store adaptor.Store,
  188. req *http.Request,
  189. do adaptor.ConvertRequest,
  190. ) (adaptor.ConvertResult, error) {
  191. pluginConfig, err := getPluginConfig(meta)
  192. if err != nil {
  193. return do.ConvertRequest(meta, store, req)
  194. }
  195. if !pluginConfig.Enable {
  196. return do.ConvertRequest(meta, store, req)
  197. }
  198. body, err := common.GetRequestBody(req)
  199. if err != nil {
  200. return adaptor.ConvertResult{}, err
  201. }
  202. if len(body) == 0 {
  203. return do.ConvertRequest(meta, store, req)
  204. }
  205. // Generate hash as cache key
  206. hash := sha256.Sum256(body)
  207. cacheKey := fmt.Sprintf("%d:%s", meta.Mode, hex.EncodeToString(hash[:]))
  208. setCacheKey(meta, cacheKey)
  209. // Check cache
  210. ctx := req.Context()
  211. if item, ok := c.getFromCache(ctx, cacheKey); ok {
  212. setCacheHit(meta, item)
  213. return adaptor.ConvertResult{}, nil
  214. }
  215. return do.ConvertRequest(meta, store, req)
  216. }
  217. // DoRequest handles the request execution phase
  218. func (c *Cache) DoRequest(
  219. meta *meta.Meta,
  220. store adaptor.Store,
  221. ctx *gin.Context,
  222. req *http.Request,
  223. do adaptor.DoRequest,
  224. ) (*http.Response, error) {
  225. if isCacheHit(meta) {
  226. return &http.Response{}, nil
  227. }
  228. return do.DoRequest(meta, store, ctx, req)
  229. }
  230. // Custom response writer to capture response for caching
  231. type responseWriter struct {
  232. gin.ResponseWriter
  233. cacheBody *bytes.Buffer
  234. maxSize int
  235. overflow bool
  236. }
  237. func (rw *responseWriter) Write(b []byte) (int, error) {
  238. if rw.overflow {
  239. return rw.ResponseWriter.Write(b)
  240. }
  241. if rw.maxSize > 0 && rw.cacheBody.Len()+len(b) > rw.maxSize {
  242. rw.overflow = true
  243. rw.cacheBody.Reset()
  244. return rw.ResponseWriter.Write(b)
  245. }
  246. rw.cacheBody.Write(b)
  247. return rw.ResponseWriter.Write(b)
  248. }
  249. func (rw *responseWriter) WriteString(s string) (int, error) {
  250. if rw.overflow {
  251. return rw.ResponseWriter.WriteString(s)
  252. }
  253. if rw.maxSize > 0 && rw.cacheBody.Len()+len(s) > rw.maxSize {
  254. rw.overflow = true
  255. rw.cacheBody.Reset()
  256. return rw.ResponseWriter.WriteString(s)
  257. }
  258. rw.cacheBody.WriteString(s)
  259. return rw.ResponseWriter.WriteString(s)
  260. }
  261. func (c *Cache) writeCacheHeader(ctx *gin.Context, pluginConfig *Config, value string) {
  262. if pluginConfig.AddCacheHitHeader {
  263. header := pluginConfig.CacheHitHeader
  264. if header == "" {
  265. header = cacheHeader
  266. }
  267. ctx.Header(header, value)
  268. }
  269. }
  270. // DoResponse handles the response processing phase
  271. func (c *Cache) DoResponse(
  272. meta *meta.Meta,
  273. store adaptor.Store,
  274. ctx *gin.Context,
  275. resp *http.Response,
  276. do adaptor.DoResponse,
  277. ) (usage model.Usage, adapterErr adaptor.Error) {
  278. pluginConfig, err := getPluginConfig(meta)
  279. if err != nil {
  280. return do.DoResponse(meta, store, ctx, resp)
  281. }
  282. // Handle cache hit
  283. if isCacheHit(meta) {
  284. item := getCacheItem(meta)
  285. if item == nil {
  286. return do.DoResponse(meta, store, ctx, resp)
  287. }
  288. // Restore headers from cache
  289. for k, v := range item.Header {
  290. for _, val := range v {
  291. ctx.Header(k, val)
  292. }
  293. }
  294. // Override specific headers
  295. ctx.Header("Content-Type", item.Header["Content-Type"][0])
  296. ctx.Header("Content-Length", strconv.Itoa(len(item.Body)))
  297. c.writeCacheHeader(ctx, pluginConfig, "hit")
  298. _, _ = ctx.Writer.Write(item.Body)
  299. return item.Usage, nil
  300. }
  301. if !pluginConfig.Enable {
  302. return do.DoResponse(meta, store, ctx, resp)
  303. }
  304. c.writeCacheHeader(ctx, pluginConfig, "miss")
  305. // Set up response capture for caching
  306. buf := getBuffer()
  307. defer putBuffer(buf)
  308. rw := &responseWriter{
  309. ResponseWriter: ctx.Writer,
  310. maxSize: pluginConfig.ItemMaxSize,
  311. cacheBody: buf,
  312. }
  313. ctx.Writer = rw
  314. defer func() {
  315. ctx.Writer = rw.ResponseWriter
  316. if adapterErr != nil ||
  317. rw.overflow ||
  318. rw.cacheBody.Len() == 0 {
  319. return
  320. }
  321. // Convert http.Header to map[string][]string for JSON serialization
  322. headerMap := make(map[string][]string)
  323. for k, v := range rw.Header() {
  324. headerMap[k] = v
  325. }
  326. // Store in cache
  327. item := Item{
  328. Body: bytes.Clone(rw.cacheBody.Bytes()),
  329. Header: headerMap,
  330. Usage: usage,
  331. }
  332. ttl := time.Duration(pluginConfig.TTL) * time.Second
  333. c.setToCache(ctx.Request.Context(), getCacheKey(meta), item, ttl)
  334. }()
  335. return do.DoResponse(meta, store, ctx, resp)
  336. }