| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- package cachex
- import (
- "context"
- "errors"
- "strings"
- "sync"
- "time"
- "github.com/go-redis/redis/v8"
- "github.com/samber/hot"
- )
- const (
- defaultRedisOpTimeout = 2 * time.Second
- defaultRedisScanTimeout = 30 * time.Second
- defaultRedisDelTimeout = 10 * time.Second
- )
- type HybridCacheConfig[V any] struct {
- Namespace Namespace
- // Redis is used when RedisEnabled returns true (or RedisEnabled is nil) and Redis is not nil.
- Redis *redis.Client
- RedisCodec ValueCodec[V]
- RedisEnabled func() bool
- // Memory builds a hot cache used when Redis is disabled. Keys stored in memory are fully namespaced.
- Memory func() *hot.HotCache[string, V]
- }
- // HybridCache is a small helper that uses Redis when enabled, otherwise falls back to in-memory hot cache.
- type HybridCache[V any] struct {
- ns Namespace
- redis *redis.Client
- redisCodec ValueCodec[V]
- redisEnabled func() bool
- memOnce sync.Once
- memInit func() *hot.HotCache[string, V]
- mem *hot.HotCache[string, V]
- }
- func NewHybridCache[V any](cfg HybridCacheConfig[V]) *HybridCache[V] {
- return &HybridCache[V]{
- ns: cfg.Namespace,
- redis: cfg.Redis,
- redisCodec: cfg.RedisCodec,
- redisEnabled: cfg.RedisEnabled,
- memInit: cfg.Memory,
- }
- }
- func (c *HybridCache[V]) FullKey(key string) string {
- return c.ns.FullKey(key)
- }
- func (c *HybridCache[V]) redisOn() bool {
- if c.redis == nil || c.redisCodec == nil {
- return false
- }
- if c.redisEnabled == nil {
- return true
- }
- return c.redisEnabled()
- }
- func (c *HybridCache[V]) memCache() *hot.HotCache[string, V] {
- c.memOnce.Do(func() {
- if c.memInit == nil {
- c.mem = hot.NewHotCache[string, V](hot.LRU, 1).Build()
- return
- }
- c.mem = c.memInit()
- })
- return c.mem
- }
- func (c *HybridCache[V]) Get(key string) (value V, found bool, err error) {
- full := c.ns.FullKey(key)
- if full == "" {
- var zero V
- return zero, false, nil
- }
- if c.redisOn() {
- ctx, cancel := context.WithTimeout(context.Background(), defaultRedisOpTimeout)
- defer cancel()
- raw, e := c.redis.Get(ctx, full).Result()
- if e == nil {
- v, decErr := c.redisCodec.Decode(raw)
- if decErr != nil {
- var zero V
- return zero, false, decErr
- }
- return v, true, nil
- }
- if errors.Is(e, redis.Nil) {
- var zero V
- return zero, false, nil
- }
- var zero V
- return zero, false, e
- }
- return c.memCache().Get(full)
- }
- func (c *HybridCache[V]) SetWithTTL(key string, v V, ttl time.Duration) error {
- full := c.ns.FullKey(key)
- if full == "" {
- return nil
- }
- if c.redisOn() {
- raw, err := c.redisCodec.Encode(v)
- if err != nil {
- return err
- }
- ctx, cancel := context.WithTimeout(context.Background(), defaultRedisOpTimeout)
- defer cancel()
- return c.redis.Set(ctx, full, raw, ttl).Err()
- }
- c.memCache().SetWithTTL(full, v, ttl)
- return nil
- }
- // Keys returns keys with valid values. In Redis, it returns all matching keys.
- func (c *HybridCache[V]) Keys() ([]string, error) {
- if c.redisOn() {
- return c.scanKeys(c.ns.MatchPattern())
- }
- return c.memCache().Keys(), nil
- }
- func (c *HybridCache[V]) scanKeys(match string) ([]string, error) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultRedisScanTimeout)
- defer cancel()
- var cursor uint64
- keys := make([]string, 0, 1024)
- for {
- k, next, err := c.redis.Scan(ctx, cursor, match, 1000).Result()
- if err != nil {
- return keys, err
- }
- keys = append(keys, k...)
- cursor = next
- if cursor == 0 {
- break
- }
- }
- return keys, nil
- }
- func (c *HybridCache[V]) Purge() error {
- if c.redisOn() {
- keys, err := c.scanKeys(c.ns.MatchPattern())
- if err != nil {
- return err
- }
- if len(keys) == 0 {
- return nil
- }
- _, err = c.DeleteMany(keys)
- return err
- }
- c.memCache().Purge()
- return nil
- }
- func (c *HybridCache[V]) DeleteByPrefix(prefix string) (int, error) {
- fullPrefix := c.ns.FullKey(prefix)
- if fullPrefix == "" {
- return 0, nil
- }
- if !strings.HasSuffix(fullPrefix, ":") {
- fullPrefix += ":"
- }
- if c.redisOn() {
- match := fullPrefix + "*"
- keys, err := c.scanKeys(match)
- if err != nil {
- return 0, err
- }
- if len(keys) == 0 {
- return 0, nil
- }
- res, err := c.DeleteMany(keys)
- if err != nil {
- return 0, err
- }
- deleted := 0
- for _, ok := range res {
- if ok {
- deleted++
- }
- }
- return deleted, nil
- }
- // In memory, we filter keys and bulk delete.
- allKeys := c.memCache().Keys()
- keys := make([]string, 0, 128)
- for _, k := range allKeys {
- if strings.HasPrefix(k, fullPrefix) {
- keys = append(keys, k)
- }
- }
- if len(keys) == 0 {
- return 0, nil
- }
- res, _ := c.DeleteMany(keys)
- deleted := 0
- for _, ok := range res {
- if ok {
- deleted++
- }
- }
- return deleted, nil
- }
- // DeleteMany accepts either fully namespaced keys or raw keys and deletes them.
- // It returns a map keyed by fully namespaced keys.
- func (c *HybridCache[V]) DeleteMany(keys []string) (map[string]bool, error) {
- res := make(map[string]bool, len(keys))
- if len(keys) == 0 {
- return res, nil
- }
- fullKeys := make([]string, 0, len(keys))
- for _, k := range keys {
- k = c.ns.FullKey(k)
- if k == "" {
- continue
- }
- fullKeys = append(fullKeys, k)
- }
- if len(fullKeys) == 0 {
- return res, nil
- }
- if c.redisOn() {
- ctx, cancel := context.WithTimeout(context.Background(), defaultRedisDelTimeout)
- defer cancel()
- pipe := c.redis.Pipeline()
- cmds := make([]*redis.IntCmd, 0, len(fullKeys))
- for _, k := range fullKeys {
- // UNLINK is non-blocking vs DEL for large key batches.
- cmds = append(cmds, pipe.Unlink(ctx, k))
- }
- _, err := pipe.Exec(ctx)
- if err != nil && !errors.Is(err, redis.Nil) {
- return res, err
- }
- for i, cmd := range cmds {
- deleted := cmd != nil && cmd.Err() == nil && cmd.Val() > 0
- res[fullKeys[i]] = deleted
- }
- return res, nil
- }
- return c.memCache().DeleteMany(fullKeys), nil
- }
- func (c *HybridCache[V]) Capacity() (mainCacheCapacity int, missingCacheCapacity int) {
- if c.redisOn() {
- return 0, 0
- }
- return c.memCache().Capacity()
- }
- func (c *HybridCache[V]) Algorithm() (mainCacheAlgorithm string, missingCacheAlgorithm string) {
- if c.redisOn() {
- return "redis", ""
- }
- return c.memCache().Algorithm()
- }
|