redis.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package common
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "reflect"
  8. "strconv"
  9. "time"
  10. "github.com/go-redis/redis/v8"
  11. "gorm.io/gorm"
  12. )
  13. var RDB *redis.Client
  14. var RedisEnabled = true
  15. // InitRedisClient This function is called after init()
  16. func InitRedisClient() (err error) {
  17. if os.Getenv("REDIS_CONN_STRING") == "" {
  18. RedisEnabled = false
  19. SysLog("REDIS_CONN_STRING not set, Redis is not enabled")
  20. return nil
  21. }
  22. if os.Getenv("SYNC_FREQUENCY") == "" {
  23. SysLog("SYNC_FREQUENCY not set, use default value 60")
  24. SyncFrequency = 60
  25. }
  26. SysLog("Redis is enabled")
  27. opt, err := redis.ParseURL(os.Getenv("REDIS_CONN_STRING"))
  28. if err != nil {
  29. FatalLog("failed to parse Redis connection string: " + err.Error())
  30. }
  31. opt.PoolSize = GetEnvOrDefault("REDIS_POOL_SIZE", 10)
  32. RDB = redis.NewClient(opt)
  33. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  34. defer cancel()
  35. _, err = RDB.Ping(ctx).Result()
  36. if err != nil {
  37. FatalLog("Redis ping test failed: " + err.Error())
  38. }
  39. if DebugEnabled {
  40. SysLog(fmt.Sprintf("Redis connected to %s", opt.Addr))
  41. SysLog(fmt.Sprintf("Redis database: %d", opt.DB))
  42. }
  43. return err
  44. }
  45. func ParseRedisOption() *redis.Options {
  46. opt, err := redis.ParseURL(os.Getenv("REDIS_CONN_STRING"))
  47. if err != nil {
  48. FatalLog("failed to parse Redis connection string: " + err.Error())
  49. }
  50. return opt
  51. }
  52. func RedisSet(key string, value string, expiration time.Duration) error {
  53. if DebugEnabled {
  54. SysLog(fmt.Sprintf("Redis SET: key=%s, value=%s, expiration=%v", key, value, expiration))
  55. }
  56. ctx := context.Background()
  57. return RDB.Set(ctx, key, value, expiration).Err()
  58. }
  59. func RedisGet(key string) (string, error) {
  60. if DebugEnabled {
  61. SysLog(fmt.Sprintf("Redis GET: key=%s", key))
  62. }
  63. ctx := context.Background()
  64. val, err := RDB.Get(ctx, key).Result()
  65. return val, err
  66. }
  67. //func RedisExpire(key string, expiration time.Duration) error {
  68. // ctx := context.Background()
  69. // return RDB.Expire(ctx, key, expiration).Err()
  70. //}
  71. //
  72. //func RedisGetEx(key string, expiration time.Duration) (string, error) {
  73. // ctx := context.Background()
  74. // return RDB.GetSet(ctx, key, expiration).Result()
  75. //}
  76. func RedisDel(key string) error {
  77. if DebugEnabled {
  78. SysLog(fmt.Sprintf("Redis DEL: key=%s", key))
  79. }
  80. ctx := context.Background()
  81. return RDB.Del(ctx, key).Err()
  82. }
  83. func RedisDelKey(key string) error {
  84. if DebugEnabled {
  85. SysLog(fmt.Sprintf("Redis DEL Key: key=%s", key))
  86. }
  87. ctx := context.Background()
  88. return RDB.Del(ctx, key).Err()
  89. }
  90. func RedisHSetObj(key string, obj interface{}, expiration time.Duration) error {
  91. if DebugEnabled {
  92. SysLog(fmt.Sprintf("Redis HSET: key=%s, obj=%+v, expiration=%v", key, obj, expiration))
  93. }
  94. ctx := context.Background()
  95. data := make(map[string]interface{})
  96. // 使用反射遍历结构体字段
  97. v := reflect.ValueOf(obj).Elem()
  98. t := v.Type()
  99. for i := 0; i < v.NumField(); i++ {
  100. field := t.Field(i)
  101. value := v.Field(i)
  102. // Skip DeletedAt field
  103. if field.Type.String() == "gorm.DeletedAt" {
  104. continue
  105. }
  106. // 处理指针类型
  107. if value.Kind() == reflect.Ptr {
  108. if value.IsNil() {
  109. data[field.Name] = ""
  110. continue
  111. }
  112. value = value.Elem()
  113. }
  114. // 处理布尔类型
  115. if value.Kind() == reflect.Bool {
  116. data[field.Name] = strconv.FormatBool(value.Bool())
  117. continue
  118. }
  119. // 其他类型直接转换为字符串
  120. data[field.Name] = fmt.Sprintf("%v", value.Interface())
  121. }
  122. txn := RDB.TxPipeline()
  123. txn.HSet(ctx, key, data)
  124. txn.Expire(ctx, key, expiration)
  125. _, err := txn.Exec(ctx)
  126. if err != nil {
  127. return fmt.Errorf("failed to execute transaction: %w", err)
  128. }
  129. return nil
  130. }
  131. func RedisHGetObj(key string, obj interface{}) error {
  132. if DebugEnabled {
  133. SysLog(fmt.Sprintf("Redis HGETALL: key=%s", key))
  134. }
  135. ctx := context.Background()
  136. result, err := RDB.HGetAll(ctx, key).Result()
  137. if err != nil {
  138. return fmt.Errorf("failed to load hash from Redis: %w", err)
  139. }
  140. if len(result) == 0 {
  141. return fmt.Errorf("key %s not found in Redis", key)
  142. }
  143. // Handle both pointer and non-pointer values
  144. val := reflect.ValueOf(obj)
  145. if val.Kind() != reflect.Ptr {
  146. return fmt.Errorf("obj must be a pointer to a struct, got %T", obj)
  147. }
  148. v := val.Elem()
  149. if v.Kind() != reflect.Struct {
  150. return fmt.Errorf("obj must be a pointer to a struct, got pointer to %T", v.Interface())
  151. }
  152. t := v.Type()
  153. for i := 0; i < v.NumField(); i++ {
  154. field := t.Field(i)
  155. fieldName := field.Name
  156. if value, ok := result[fieldName]; ok {
  157. fieldValue := v.Field(i)
  158. // Handle pointer types
  159. if fieldValue.Kind() == reflect.Ptr {
  160. if value == "" {
  161. continue
  162. }
  163. if fieldValue.IsNil() {
  164. fieldValue.Set(reflect.New(fieldValue.Type().Elem()))
  165. }
  166. fieldValue = fieldValue.Elem()
  167. }
  168. // Enhanced type handling for Token struct
  169. switch fieldValue.Kind() {
  170. case reflect.String:
  171. fieldValue.SetString(value)
  172. case reflect.Int, reflect.Int64:
  173. intValue, err := strconv.ParseInt(value, 10, 64)
  174. if err != nil {
  175. return fmt.Errorf("failed to parse int field %s: %w", fieldName, err)
  176. }
  177. fieldValue.SetInt(intValue)
  178. case reflect.Bool:
  179. boolValue, err := strconv.ParseBool(value)
  180. if err != nil {
  181. return fmt.Errorf("failed to parse bool field %s: %w", fieldName, err)
  182. }
  183. fieldValue.SetBool(boolValue)
  184. case reflect.Struct:
  185. // Special handling for gorm.DeletedAt
  186. if fieldValue.Type().String() == "gorm.DeletedAt" {
  187. if value != "" {
  188. timeValue, err := time.Parse(time.RFC3339, value)
  189. if err != nil {
  190. return fmt.Errorf("failed to parse DeletedAt field %s: %w", fieldName, err)
  191. }
  192. fieldValue.Set(reflect.ValueOf(gorm.DeletedAt{Time: timeValue, Valid: true}))
  193. }
  194. }
  195. default:
  196. return fmt.Errorf("unsupported field type: %s for field %s", fieldValue.Kind(), fieldName)
  197. }
  198. }
  199. }
  200. return nil
  201. }
  202. // RedisIncr Add this function to handle atomic increments
  203. func RedisIncr(key string, delta int64) error {
  204. if DebugEnabled {
  205. SysLog(fmt.Sprintf("Redis INCR: key=%s, delta=%d", key, delta))
  206. }
  207. // 检查键的剩余生存时间
  208. ttlCmd := RDB.TTL(context.Background(), key)
  209. ttl, err := ttlCmd.Result()
  210. if err != nil && !errors.Is(err, redis.Nil) {
  211. return fmt.Errorf("failed to get TTL: %w", err)
  212. }
  213. // 只有在 key 存在且有 TTL 时才需要特殊处理
  214. if ttl > 0 {
  215. ctx := context.Background()
  216. // 开始一个Redis事务
  217. txn := RDB.TxPipeline()
  218. // 减少余额
  219. decrCmd := txn.IncrBy(ctx, key, delta)
  220. if err := decrCmd.Err(); err != nil {
  221. return err // 如果减少失败,则直接返回错误
  222. }
  223. // 重新设置过期时间,使用原来的过期时间
  224. txn.Expire(ctx, key, ttl)
  225. // 执行事务
  226. _, err = txn.Exec(ctx)
  227. return err
  228. }
  229. return nil
  230. }
  231. func RedisHIncrBy(key, field string, delta int64) error {
  232. if DebugEnabled {
  233. SysLog(fmt.Sprintf("Redis HINCRBY: key=%s, field=%s, delta=%d", key, field, delta))
  234. }
  235. ttlCmd := RDB.TTL(context.Background(), key)
  236. ttl, err := ttlCmd.Result()
  237. if err != nil && !errors.Is(err, redis.Nil) {
  238. return fmt.Errorf("failed to get TTL: %w", err)
  239. }
  240. if ttl > 0 {
  241. ctx := context.Background()
  242. txn := RDB.TxPipeline()
  243. incrCmd := txn.HIncrBy(ctx, key, field, delta)
  244. if err := incrCmd.Err(); err != nil {
  245. return err
  246. }
  247. txn.Expire(ctx, key, ttl)
  248. _, err = txn.Exec(ctx)
  249. return err
  250. }
  251. return nil
  252. }
  253. func RedisHSetField(key, field string, value interface{}) error {
  254. if DebugEnabled {
  255. SysLog(fmt.Sprintf("Redis HSET field: key=%s, field=%s, value=%v", key, field, value))
  256. }
  257. ttlCmd := RDB.TTL(context.Background(), key)
  258. ttl, err := ttlCmd.Result()
  259. if err != nil && !errors.Is(err, redis.Nil) {
  260. return fmt.Errorf("failed to get TTL: %w", err)
  261. }
  262. if ttl > 0 {
  263. ctx := context.Background()
  264. txn := RDB.TxPipeline()
  265. hsetCmd := txn.HSet(ctx, key, field, value)
  266. if err := hsetCmd.Err(); err != nil {
  267. return err
  268. }
  269. txn.Expire(ctx, key, ttl)
  270. _, err = txn.Exec(ctx)
  271. return err
  272. }
  273. return nil
  274. }