redis.ts 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import { createClient, type RedisClientType } from "redis"
  2. let redis: RedisClientType | undefined
  3. export const redisClient = async () => {
  4. if (!redis) {
  5. redis = createClient({ url: process.env.REDIS_URL || "redis://localhost:6379" })
  6. redis.on("error", (error) => console.error("redis error:", error))
  7. await redis.connect()
  8. }
  9. return redis
  10. }
  11. export const getPubSubKey = (runId: number) => `evals:${runId}`
  12. export const getRunnersKey = (runId: number) => `runners:${runId}`
  13. export const getHeartbeatKey = (runId: number) => `heartbeat:${runId}`
  14. export const registerRunner = async ({
  15. runId,
  16. taskId,
  17. timeoutSeconds,
  18. }: {
  19. runId: number
  20. taskId: number
  21. timeoutSeconds: number
  22. }) => {
  23. const redis = await redisClient()
  24. const runnersKey = getRunnersKey(runId)
  25. await redis.sAdd(runnersKey, `task-${taskId}:${process.env.HOSTNAME ?? process.pid}`)
  26. await redis.expire(runnersKey, timeoutSeconds)
  27. }
  28. export const deregisterRunner = async ({ runId, taskId }: { runId: number; taskId: number }) => {
  29. const redis = await redisClient()
  30. await redis.sRem(getRunnersKey(runId), `task-${taskId}:${process.env.HOSTNAME ?? process.pid}`)
  31. }
  32. export const startHeartbeat = async (runId: number, seconds: number = 10) => {
  33. const pid = process.pid.toString()
  34. const redis = await redisClient()
  35. const heartbeatKey = getHeartbeatKey(runId)
  36. await redis.setEx(heartbeatKey, seconds, pid)
  37. return setInterval(
  38. () =>
  39. redis.expire(heartbeatKey, seconds).catch((error) => {
  40. console.error("heartbeat error:", error)
  41. }),
  42. (seconds * 1_000) / 2,
  43. )
  44. }
  45. export const stopHeartbeat = async (runId: number, heartbeat: NodeJS.Timeout) => {
  46. clearInterval(heartbeat)
  47. try {
  48. const redis = await redisClient()
  49. await redis.del(getHeartbeatKey(runId))
  50. } catch (error) {
  51. console.error("redis.del failed:", error)
  52. }
  53. }