hybrid_cache.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package cachex
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/go-redis/redis/v8"
  9. "github.com/samber/hot"
  10. )
  11. const (
  12. defaultRedisOpTimeout = 2 * time.Second
  13. defaultRedisScanTimeout = 30 * time.Second
  14. defaultRedisDelTimeout = 10 * time.Second
  15. )
  16. type HybridCacheConfig[V any] struct {
  17. Namespace Namespace
  18. // Redis is used when RedisEnabled returns true (or RedisEnabled is nil) and Redis is not nil.
  19. Redis *redis.Client
  20. RedisCodec ValueCodec[V]
  21. RedisEnabled func() bool
  22. // Memory builds a hot cache used when Redis is disabled. Keys stored in memory are fully namespaced.
  23. Memory func() *hot.HotCache[string, V]
  24. }
  25. // HybridCache is a small helper that uses Redis when enabled, otherwise falls back to in-memory hot cache.
  26. type HybridCache[V any] struct {
  27. ns Namespace
  28. redis *redis.Client
  29. redisCodec ValueCodec[V]
  30. redisEnabled func() bool
  31. memOnce sync.Once
  32. memInit func() *hot.HotCache[string, V]
  33. mem *hot.HotCache[string, V]
  34. }
  35. func NewHybridCache[V any](cfg HybridCacheConfig[V]) *HybridCache[V] {
  36. return &HybridCache[V]{
  37. ns: cfg.Namespace,
  38. redis: cfg.Redis,
  39. redisCodec: cfg.RedisCodec,
  40. redisEnabled: cfg.RedisEnabled,
  41. memInit: cfg.Memory,
  42. }
  43. }
  44. func (c *HybridCache[V]) FullKey(key string) string {
  45. return c.ns.FullKey(key)
  46. }
  47. func (c *HybridCache[V]) redisOn() bool {
  48. if c.redis == nil || c.redisCodec == nil {
  49. return false
  50. }
  51. if c.redisEnabled == nil {
  52. return true
  53. }
  54. return c.redisEnabled()
  55. }
  56. func (c *HybridCache[V]) memCache() *hot.HotCache[string, V] {
  57. c.memOnce.Do(func() {
  58. if c.memInit == nil {
  59. c.mem = hot.NewHotCache[string, V](hot.LRU, 1).Build()
  60. return
  61. }
  62. c.mem = c.memInit()
  63. })
  64. return c.mem
  65. }
  66. func (c *HybridCache[V]) Get(key string) (value V, found bool, err error) {
  67. full := c.ns.FullKey(key)
  68. if full == "" {
  69. var zero V
  70. return zero, false, nil
  71. }
  72. if c.redisOn() {
  73. ctx, cancel := context.WithTimeout(context.Background(), defaultRedisOpTimeout)
  74. defer cancel()
  75. raw, e := c.redis.Get(ctx, full).Result()
  76. if e == nil {
  77. v, decErr := c.redisCodec.Decode(raw)
  78. if decErr != nil {
  79. var zero V
  80. return zero, false, decErr
  81. }
  82. return v, true, nil
  83. }
  84. if errors.Is(e, redis.Nil) {
  85. var zero V
  86. return zero, false, nil
  87. }
  88. var zero V
  89. return zero, false, e
  90. }
  91. return c.memCache().Get(full)
  92. }
  93. func (c *HybridCache[V]) SetWithTTL(key string, v V, ttl time.Duration) error {
  94. full := c.ns.FullKey(key)
  95. if full == "" {
  96. return nil
  97. }
  98. if c.redisOn() {
  99. raw, err := c.redisCodec.Encode(v)
  100. if err != nil {
  101. return err
  102. }
  103. ctx, cancel := context.WithTimeout(context.Background(), defaultRedisOpTimeout)
  104. defer cancel()
  105. return c.redis.Set(ctx, full, raw, ttl).Err()
  106. }
  107. c.memCache().SetWithTTL(full, v, ttl)
  108. return nil
  109. }
  110. // Keys returns keys with valid values. In Redis, it returns all matching keys.
  111. func (c *HybridCache[V]) Keys() ([]string, error) {
  112. if c.redisOn() {
  113. return c.scanKeys(c.ns.MatchPattern())
  114. }
  115. return c.memCache().Keys(), nil
  116. }
  117. func (c *HybridCache[V]) scanKeys(match string) ([]string, error) {
  118. ctx, cancel := context.WithTimeout(context.Background(), defaultRedisScanTimeout)
  119. defer cancel()
  120. var cursor uint64
  121. keys := make([]string, 0, 1024)
  122. for {
  123. k, next, err := c.redis.Scan(ctx, cursor, match, 1000).Result()
  124. if err != nil {
  125. return keys, err
  126. }
  127. keys = append(keys, k...)
  128. cursor = next
  129. if cursor == 0 {
  130. break
  131. }
  132. }
  133. return keys, nil
  134. }
  135. func (c *HybridCache[V]) Purge() error {
  136. if c.redisOn() {
  137. keys, err := c.scanKeys(c.ns.MatchPattern())
  138. if err != nil {
  139. return err
  140. }
  141. if len(keys) == 0 {
  142. return nil
  143. }
  144. _, err = c.DeleteMany(keys)
  145. return err
  146. }
  147. c.memCache().Purge()
  148. return nil
  149. }
  150. func (c *HybridCache[V]) DeleteByPrefix(prefix string) (int, error) {
  151. fullPrefix := c.ns.FullKey(prefix)
  152. if fullPrefix == "" {
  153. return 0, nil
  154. }
  155. if !strings.HasSuffix(fullPrefix, ":") {
  156. fullPrefix += ":"
  157. }
  158. if c.redisOn() {
  159. match := fullPrefix + "*"
  160. keys, err := c.scanKeys(match)
  161. if err != nil {
  162. return 0, err
  163. }
  164. if len(keys) == 0 {
  165. return 0, nil
  166. }
  167. res, err := c.DeleteMany(keys)
  168. if err != nil {
  169. return 0, err
  170. }
  171. deleted := 0
  172. for _, ok := range res {
  173. if ok {
  174. deleted++
  175. }
  176. }
  177. return deleted, nil
  178. }
  179. // In memory, we filter keys and bulk delete.
  180. allKeys := c.memCache().Keys()
  181. keys := make([]string, 0, 128)
  182. for _, k := range allKeys {
  183. if strings.HasPrefix(k, fullPrefix) {
  184. keys = append(keys, k)
  185. }
  186. }
  187. if len(keys) == 0 {
  188. return 0, nil
  189. }
  190. res, _ := c.DeleteMany(keys)
  191. deleted := 0
  192. for _, ok := range res {
  193. if ok {
  194. deleted++
  195. }
  196. }
  197. return deleted, nil
  198. }
  199. // DeleteMany accepts either fully namespaced keys or raw keys and deletes them.
  200. // It returns a map keyed by fully namespaced keys.
  201. func (c *HybridCache[V]) DeleteMany(keys []string) (map[string]bool, error) {
  202. res := make(map[string]bool, len(keys))
  203. if len(keys) == 0 {
  204. return res, nil
  205. }
  206. fullKeys := make([]string, 0, len(keys))
  207. for _, k := range keys {
  208. k = c.ns.FullKey(k)
  209. if k == "" {
  210. continue
  211. }
  212. fullKeys = append(fullKeys, k)
  213. }
  214. if len(fullKeys) == 0 {
  215. return res, nil
  216. }
  217. if c.redisOn() {
  218. ctx, cancel := context.WithTimeout(context.Background(), defaultRedisDelTimeout)
  219. defer cancel()
  220. pipe := c.redis.Pipeline()
  221. cmds := make([]*redis.IntCmd, 0, len(fullKeys))
  222. for _, k := range fullKeys {
  223. // UNLINK is non-blocking vs DEL for large key batches.
  224. cmds = append(cmds, pipe.Unlink(ctx, k))
  225. }
  226. _, err := pipe.Exec(ctx)
  227. if err != nil && !errors.Is(err, redis.Nil) {
  228. return res, err
  229. }
  230. for i, cmd := range cmds {
  231. deleted := cmd != nil && cmd.Err() == nil && cmd.Val() > 0
  232. res[fullKeys[i]] = deleted
  233. }
  234. return res, nil
  235. }
  236. return c.memCache().DeleteMany(fullKeys), nil
  237. }
  238. func (c *HybridCache[V]) Capacity() (mainCacheCapacity int, missingCacheCapacity int) {
  239. if c.redisOn() {
  240. return 0, 0
  241. }
  242. return c.memCache().Capacity()
  243. }
  244. func (c *HybridCache[V]) Algorithm() (mainCacheAlgorithm string, missingCacheAlgorithm string) {
  245. if c.redisOn() {
  246. return "redis", ""
  247. }
  248. return c.memCache().Algorithm()
  249. }