Skip to content

Redis

Redis的基本工作原理

Redis(Remote Dictionary Server,远程字典服务)是一个开源的、高性能的键值存储数据库,常用于缓存、消息队列和数据存储等场景。

Redis的数据从内存读取,增量数据保存到AOF文件,全量数据保存到RDB文件。在读写内存时,会把写命令追加到AOF文件,RDB文件保存Redis中所有的数据。当Redis启动时,根据RDB文件恢复数据到内存,再根据AOF文件把末尾几条未写入RDB的日志恢复数据。

Redis案例应用

连续签到(String)

1.签到之后签到天数加一 2.连续(0点之前签到,否则归零)

go
// addContinuesDays 为用户签到续期
func addContinuesDays(ctx context.Context, userID int64) {
	key := fmt.Sprintf(continuesCheckKey, userID)
	// 1. 连续签到数+1
	err := RedisClient.Incr(ctx, key).Err()
	if err != nil {
		fmt.Errorf("用户[%d]连续签到失败", userID)
	} else {
		expAt := beginningOfDay().Add(48 * time.Hour)
		// 2. 设置签到记录在后天的0点到期
		if err := RedisClient.ExpireAt(ctx, key, expAt).Err(); err != nil {
			panic(err)
		} else {
			// 3. 打印用户续签后的连续签到天数
			day, err := getUserCheckInDays(ctx, userID)
			if err != nil {
				panic(err)
			}
			fmt.Printf("用户[%d]连续签到:%d(天), 过期时间:%s", userID, day, expAt.Format("2006-01-02 15:04:05"))
		}
	}
}

// getUserCheckInDays 获取用户连续签到天数
func getUserCheckInDays(ctx context.Context, userID int64) (int64, error) {
	key := fmt.Sprintf(continuesCheckKey, userID)
	days, err := RedisClient.Get(ctx, key).Result()
	if err != nil {
		return 0, err
	}
	if daysInt, err := strconv.ParseInt(days, 10, 64); err != nil {
		panic(err)
	} else {
		return daysInt, nil
	}
}

// beginningOfDay 获取今天0点时间
func beginningOfDay() time.Time {
	now := time.Now()
	y, m, d := now.Date()
	return time.Date(y, m, d, 0, 0, 0, 0, time.Local)
}

用到的数据结构:String

String 是 Redis 中最基本的数据类型,可以存储任意类型的数据,如文本、数字、图片或序列化的对象等。一个 String 类型的键最大可以存储 512MB 的数据

Redis 的 String 类型底层实现是 SDS(Simple Dynamic String,简单动态字符串)。SDS 由长度、空闲空间和字节数组三部分组成

应用场景:存储计数,Session

消息通知(List)

用List作为消息队列,例如当文章更新时,将更新后的文章推送到ES,用户就能搜索到最新的文章数据。

go
package example

import (
	"context"
	"fmt"
	"strings"
	"time"

	"gitee.com/wedone/redis_course/example/common"
)

const ex04ListenList = "ex04_list_0" // lpush ex04_list_0 AA BB

// Ex04Params Ex04的自定义函数
type Ex04Params struct {
}

func Ex04(ctx context.Context) {
	eventLogger := &common.ConcurrentEventLogger{}
	// new一个并发执行器
	// routineNums是消费端的数量,多消费的场景,可以使用ex04ConsumerPop,使用ex04ConsumerRange存在消息重复消费的问题。
	cInst := common.NewConcurrentRoutine(1, eventLogger)
	// 并发执行用户自定义函数work
	cInst.Run(ctx, Ex04Params{}, ex04ConsumerPop)
	// 按日志时间正序打印日志
	eventLogger.PrintLogs()
}

// ex04ConsumerPop 使用rpop逐条消费队列中的信息,数据从队列中移除
// 生成端使用:lpush ex04_list_0 AA BB
func ex04ConsumerPop(ctx context.Context, cInstParam common.CInstParams) {
	routine := cInstParam.Routine
	for {
		items, err := RedisClient.BRPop(ctx, 0, ex04ListenList).Result()
		if err != nil {
			panic(err)
		}
		fmt.Println(common.LogFormat(routine, "读取文章[%s]标题、正文,发送到ES更新索引", items[1]))
		// 将文章内容推送到ES
		time.Sleep(1 * time.Second)
	}
}

// ex04ConsumerRange 使用lrange批量消费队列中的数据,数据保留在队列中
// 生成端使用:rpush ex04_list_0 AA BB
// 消费端:
// 方法1 lrange ex04_list_0 -3 -1 // 从FIFO队尾中一次消费3条信息
// 方法2 rpop ex04_list_0 3
func ex04ConsumerRange(ctx context.Context, cInstParam common.CInstParams) {
	routine := cInstParam.Routine
	consumeBatchSize := int64(3) // 一次取N个消息
	for {
		// 从index(-consumeBatchSize)开始取,直到最后一个元素index(-1)
		items, err := RedisClient.LRange(ctx, ex04ListenList, -consumeBatchSize, -1).Result()
		if err != nil {
			panic(err)
		}
		if len(items) > 0 {
			fmt.Println(common.LogFormat(routine, "收到信息:%s", strings.Join(items, "->")))
			// 清除已消费的队列
			// 方法1 使用LTrim
			// 保留从index(0)开始到index(-(consumeBatchSize + 1))的部分,即为未消费的部分
			// RedisClient.LTrim(ctx, ex04ListenList, 0, -(consumeBatchSize + 1))

			// 方法2 使用RPop
			RedisClient.RPopCount(ctx, ex04ListenList, int(consumeBatchSize))
		}
		time.Sleep(3 * time.Second)
	}
}

QuickList由双向链表和listpack实现,Redis为了压缩内存,把多个数据存到一个节点里,也就是listpack。listpack头部会放长度以及元素个数,每个元素占的长度一样长。

计数(Hash)

当有多项计数需求时,可以用Hash数据结构。

go
const Ex05UserCountKey = "ex05_user_count"

func Ex05(ctx context.Context, args []string) {
	if len(args) == 0 {
		fmt.Printf("args can NOT be empty\n")
		os.Exit(1)
	}
	arg1 := args[0]
	switch arg1 {
	case "init":
		Ex06InitUserCounter(ctx)
	case "get":
		userID, err := strconv.ParseInt(args[1], 10, 64)
		if err != nil {
			panic(err)
		}
		GetUserCounter(ctx, userID)
	case "incr_like":
		userID, err := strconv.ParseInt(args[1], 10, 64)
		if err != nil {
			panic(err)
		}
		IncrByUserLike(ctx, userID)
	case "incr_collect":
		userID, err := strconv.ParseInt(args[1], 10, 64)
		if err != nil {
			panic(err)
		}
		IncrByUserCollect(ctx, userID)
	case "decr_like":
		userID, err := strconv.ParseInt(args[1], 10, 64)
		if err != nil {
			panic(err)
		}
		DecrByUserLike(ctx, userID)
	case "decr_collect":
		userID, err := strconv.ParseInt(args[1], 10, 64)
		if err != nil {
			panic(err)
		}
		DecrByUserCollect(ctx, userID)
	}

}

func Ex06InitUserCounter(ctx context.Context) {
	pipe := RedisClient.Pipeline()
	userCounters := []map[string]interface{}{
		{"user_id": "1556564194374926", "got_digg_count": 10693, "got_view_count": 2238438, "followee_count": 176, "follower_count": 9895, "follow_collect_set_count": 0, "subscribe_tag_count": 95},
		{"user_id": "1111", "got_digg_count": 19, "got_view_count": 4},
		{"user_id": "2222", "got_digg_count": 1238, "follower_count": 379},
	}
	for _, counter := range userCounters {
		uid, err := strconv.ParseInt(counter["user_id"].(string), 10, 64)
		key := GetUserCounterKey(uid)
		rw, err := pipe.Del(ctx, key).Result()
		if err != nil {
			fmt.Printf("del %s, rw=%d\n", key, rw)
		}
		_, err = pipe.HMSet(ctx, key, counter).Result()
		if err != nil {
			panic(err)
		}

		fmt.Printf("设置 uid=%d, key=%s\n", uid, key)
	}
	// 批量执行上面for循环设置好的hmset命令
	_, err := pipe.Exec(ctx)
	if err != nil { // 报错后进行一次额外尝试
		_, err = pipe.Exec(ctx)
		if err != nil {
			panic(err)
		}
	}
}

func GetUserCounterKey(userID int64) string {
	return fmt.Sprintf("%s_%d", Ex05UserCountKey, userID)
}

func GetUserCounter(ctx context.Context, userID int64) {
	pipe := RedisClient.Pipeline()
	GetUserCounterKey(userID)
	pipe.HGetAll(ctx, GetUserCounterKey(userID))
	cmders, err := pipe.Exec(ctx)
	if err != nil {
		panic(err)
	}
	for _, cmder := range cmders {
		counterMap, err := cmder.(*redis.MapStringStringCmd).Result()
		if err != nil {
			panic(err)
		}
		for field, value := range counterMap {
			fmt.Printf("%s: %s\n", field, value)
		}
	}
}

// IncrByUserLike 点赞数+1
func IncrByUserLike(ctx context.Context, userID int64) {
	incrByUserField(ctx, userID, "got_digg_count")
}

// IncrByUserCollect 收藏数+1
func IncrByUserCollect(ctx context.Context, userID int64) {
	incrByUserField(ctx, userID, "follow_collect_set_count")
}

// DecrByUserLike 点赞数-1
func DecrByUserLike(ctx context.Context, userID int64) {
	decrByUserField(ctx, userID, "got_digg_count")
}

// DecrByUserCollect 收藏数-1
func DecrByUserCollect(ctx context.Context, userID int64) {
	decrByUserField(ctx, userID, "follow_collect_set_count")
}

func incrByUserField(ctx context.Context, userID int64, field string) {
	change(ctx, userID, field, 1)
}

func decrByUserField(ctx context.Context, userID int64, field string) {
	change(ctx, userID, field, -1)
}

func change(ctx context.Context, userID int64, field string, incr int64) {
	redisKey := GetUserCounterKey(userID)
	before, err := RedisClient.HGet(ctx, redisKey, field).Result()
	if err != nil {
		panic(err)
	}
	beforeInt, err := strconv.ParseInt(before, 10, 64)
	if err != nil {
		panic(err)
	}
	if beforeInt+incr < 0 {
		fmt.Printf("禁止变更计数,计数变更后小于0. %d + (%d) = %d\n", beforeInt, incr, beforeInt+incr)
		return
	}
	fmt.Printf("user_id: %d\n更新前\n%s = %s\n--------\n", userID, field, before)
	_, err = RedisClient.HIncrBy(ctx, redisKey, field, incr).Result()
	if err != nil {
		panic(err)
	}
	// fmt.Printf("更新记录[%d]:%d\n", userID, num)
	count, err := RedisClient.HGet(ctx, redisKey, field).Result()
	if err != nil {
		panic(err)
	}
	fmt.Printf("user_id: %d\n更新后\n%s = %s\n--------\n", userID, field, count)
}

使用Hash就好比点外卖,把一顿饭直接打包过来,而不是一道菜一道菜送过来。

Redis中的rehash机制

  1. 哈希表结构

Redis的字典(dict)使用哈希表(dictht)作为其基本存储结构。每个字典包含两个哈希表,用于实现渐进式rehash。哈希表的结构定义如下:

  • 单个节点dictEntry,包含键(key)、值(v)和指向下一个哈希表节点的指针(next)。
  • 哈希表dictht,包含哈希表数组(table)、哈希表大小(size)、大小掩码(sizemask)和已使用的节点数(used)。
  • 字典结构体dict,包含两个哈希表(ht[0]ht[1])、类型特定函数(type)、私有数据(privdata)、rehash索引(rehashidx)和迭代器数量(iterators)。
  1. 渐进式哈希扩容(rehash)

Redis采用渐进式rehash机制,避免一次性完成rehash操作导致的性能瓶颈。具体过程如下:

  1. 初始化rehash

    • rehashidx 被设置为 0,表示rehash开始。
    • ht[1] 分配新的空间,大小通常是 ht[0] 的两倍。
  2. 逐步迁移

    • 每次执行哈希表操作(如 SETGETDEL 等)时,Redis会检查当前是否有需要迁移的元素。
    • 如果有迁移任务,Redis会将一部分元素从 ht[0] 迁移到 ht[1],并更新 rehashidx
    • 迁移过程中,ht[0]ht[1] 同时存在,直到所有元素迁移完成。
  3. 完成rehash

    • 当所有元素迁移到 ht[1] 后,ht[0] 被释放,ht[1] 成为新的哈希表。
    • rehashidx 被设置为 -1,表示rehash完成。
  4. 扩容条件

Redis字典的扩容发生在以下条件下:

  • 负载因子 超过设定阈值(默认为1),即字典中的元素数量超过了字典容量的负载因子。
  • Redis会将哈希表的容量扩大为原来的2倍,以降低负载因子,提高查找效率。

为什么需要rehash?

当哈希表中的元素数量增加到一定程度时,哈希表的负载因子(load factor)会超过一个预设的阈值(默认为1)。负载因子是哈希表中已使用的槽位数与总槽位数的比值。当负载因子过高时,哈希冲突的概率会显著增加,导致查找性能下降。为了维持哈希表的高效性能,Redis会触发rehash操作,将哈希表的大小扩大为原来的两倍。这样可以降低负载因子,减少哈希冲突,提高查找效率

因为不能在迁移数据时阻塞用户读取数据,所以就每次访问顺便迁移一点数据,把迁移平摊到每一次的用户访问

排行榜(ZSet)

go
package example

import (
	"context"
	"fmt"
	"strconv"
	"time"

	"github.com/go-redis/redis/v9"
)

const Ex06RankKey = "ex06_rank_zset"

type Ex06ItemScore struct {
	ItemNam string
	Score   float64
}

// Ex06 排行榜
// go run main.go init // 初始化积分
// go run main.go Ex06 rev_order // 输出完整榜单
// go run main.go  Ex06 order_page 0 // 逆序分页输出,offset=1
// go run main.go  Ex06 get_rank user2 // 获取user2的排名
// go run main.go  Ex06 get_score user2 // 获取user2的分数
// go run main.go  Ex06 add_user_score user2 10 // 为user2设置为10分
// zadd ex06_rank_zset 15 andy
// zincrby ex06_rank_zset -9 andy // andy 扣9分,排名掉到最后一名
func Ex06(ctx context.Context, args []string) {
	arg1 := args[0]
	switch arg1 {
	case "init":
		Ex06InitUserScore(ctx)
	case "rev_order":
		GetRevOrderAllList(ctx, 0, -1)
	case "order_page":
		pageSize := int64(2)
		if len(args[1]) > 0 {
			offset, err := strconv.ParseInt(args[1], 10, 64)
			if err != nil {
				panic(err)
			}
			GetOrderListByPage(ctx, offset, pageSize)
		}
	case "get_rank":
		GetUserRankByName(ctx, args[1])
	case "get_score":
		GetUserScoreByName(ctx, args[1])
	case "add_user_score":
		if len(args) < 3 {
			fmt.Printf("参数错误,可能是缺少需要增加的分值。eg:go run main.go  Ex06 add_user_score user2 10\n")
			return
		}
		score, err := strconv.ParseFloat(args[2], 64)
		if err != nil {
			panic(err)
		}
		AddUserScore(ctx, args[1], score)
	}
	return
}

func Ex06InitUserScore(ctx context.Context) {
	initList := []redis.Z{
		{Member: "user1", Score: 10}, {Member: "user2", Score: 232}, {Member: "user3", Score: 129},
		{Member: "user4", Score: 232},
	}
	// 清空榜单
	if err := RedisClient.Del(ctx, Ex06RankKey).Err(); err != nil {
		panic(err)
	}

	nums, err := RedisClient.ZAdd(ctx, Ex06RankKey, initList...).Result()
	if err != nil {
		panic(err)
	}

	fmt.Printf("初始化榜单Item数量:%d\n", nums)
}

// 榜单逆序输出
// ZRANGE ex06_rank_zset +inf -inf BYSCORE  rev WITHSCORES
// 正序输出
// ZRANGE ex06_rank_zset 0 -1 WITHSCORES
func GetRevOrderAllList(ctx context.Context, limit, offset int64) {
	resList, err := RedisClient.ZRevRangeWithScores(ctx, Ex06RankKey, 0, -1).Result()
	if err != nil {
		panic(err)
	}
	fmt.Printf("\n榜单:\n")
	for i, z := range resList {
		fmt.Printf("第%d%s\t%f\n", i+1, z.Member, z.Score)
	}
}

func GetOrderListByPage(ctx context.Context, offset, pageSize int64) {
	// zrange ex06_rank_zset 300 0 byscore rev limit 1 2 withscores // 取300分到0分之间的排名
	// zrange ex06_rank_zset -inf +inf byscore withscores 正序输出
	// ZRANGE ex06_rank_zset +inf -inf BYSCORE  REV WITHSCORES 逆序输出所有排名
	// zrange ex06_rank_zset +inf -inf byscore rev limit 0 2 withscores 逆序分页输出排名
	zRangeArgs := redis.ZRangeArgs{
		Key:     Ex06RankKey,
		ByScore: true,
		Rev:     true,
		Start:   "-inf",
		Stop:    "+inf",
		Offset:  offset,
		Count:   pageSize,
	}
	resList, err := RedisClient.ZRangeArgsWithScores(ctx, zRangeArgs).Result()
	if err != nil {
		panic(err)
	}
	fmt.Printf("\n榜单(offest=%d, pageSize=%d):\n", offset, pageSize)
	offNum := int(pageSize * offset)
	for i, z := range resList {
		rank := i + 1 + offNum
		fmt.Printf("第%d%s\t%f\n", rank, z.Member, z.Score)
	}
	fmt.Println()
}

// GetUserRankByName 获取用户排名
func GetUserRankByName(ctx context.Context, name string) {
	rank, err := RedisClient.ZRevRank(ctx, Ex06RankKey, name).Result()
	if err != nil {
		fmt.Errorf("error getting name=%s, err=%v", name, err)
		return
	}
	fmt.Printf("name=%s, 排名=%d\n", name, rank+1)
}

// GetUserScoreByName 获取用户分值
func GetUserScoreByName(ctx context.Context, name string) {
	score, err := RedisClient.ZScore(ctx, Ex06RankKey, name).Result()
	if err != nil {
		fmt.Errorf("error getting name=%s, err=%v", name, err)
		return
	}
	fmt.Println(time.Now().UnixMilli())
	fmt.Printf("name=%s, 分数=%f\n", name, score)
}

// AddUserScore 排名用户
func AddUserScore(ctx context.Context, name string, score float64) {
	num, err := RedisClient.ZIncrBy(ctx, Ex06RankKey, score, name).Result()
	if err != nil {
		panic(err)
	}
	fmt.Printf("name=%s, add_score=%f, score=%f\n", name, score, num)
}

Redis的ZSet数据结构

ZSet(有序集合)是Redis中的一种数据结构,它结合了集合和有序的特点。ZSet中的每个成员都关联一个分数(score),Redis会根据分数对成员进行排序。ZSet中的成员是唯一的,但分数可以重复。ZSet非常适合用于实现排行榜、实时分析等功能。

ZSet的底层实现使用了两个数据结构:

  1. 跳表(Skip List)
    • 跳表是一种有序的数据结构,支持快速的插入、删除和查找操作。ZSet中的成员按照分数的大小存储在跳表中,支持范围查询和排序。
  2. 哈希表(Hash Table)
    • 哈希表用于存储成员和分数的映射关系,支持快速的成员查找和分数更新。哈希表中的键是成员,值是分数。

这种结合了跳表和哈希表的实现方式,使得ZSet在插入、删除和查找操作上具有高效的性能

Redis大key、热key问题

Redis 的大 Key(Big Key)和热 Key(Hot Key)是常见的性能瓶颈问题,需要针对性优化。以下是具体解决方案:


一、大 Key(Big Key)问题

定义:大 Key 是指单个 Key 存储的数据量过大(如超过 10KB 的 String、元素过万的 Hash/List、或体积过大的复杂结构)。
风险:内存不均、操作阻塞(如 DEL 卡顿)、网络带宽压力、持久化延迟。

解决方案

  1. 拆分 Key

    • 分片存储:将大 Key 按规则拆分为多个子 Key。
      • 示例:一个 Hash 存储用户信息,按用户 ID 分片为 user_info:{user_id % 10}
    • 分段读取:使用 HSCANSSCAN 等命令分批操作,避免一次性加载全部数据。
  2. 选择高效数据结构

    • 避免大 String 存储 JSON 数据,改用 Hash 结构(字段级读写)。
    • ziplist 优化的 Hash/List(需配置 hash-max-ziplist-entrieshash-max-ziplist-value)。
    • 超大数据集考虑使用 Redis Module(如 RedisJSONRedisSearch)。
  3. 数据压缩

    • 对 String 类型的 Value 使用压缩算法(如 Gzip、Snappy),但需权衡 CPU 开销。
    • 二进制数据可序列化为高效格式(如 Protocol Buffers)。
  4. 异步删除

    • 使用 UNLINK 替代 DEL 命令(Redis 4.0+),非阻塞删除大 Key。
    • 通过 Lazy Free 机制(配置 lazyfree-lazy-user-del yes)实现后台清理。
  5. 分片集群

    • 使用 Redis Cluster 将大 Key 分布到不同节点,分散内存和计算压力。
  6. 预防与监控

    • 通过 redis-cli --bigkeys 扫描大 Key。
    • 使用 MEMORY USAGE key 查看 Key 内存占用。
    • 定期清理过期数据,设置合理的 TTL。
  7. 区分冷热数据

    例如榜单列表场景使用ZSet,只缓存前10页数据,后续数据走d'b


二、热 Key(Hot Key)问题

定义:热 Key 是指被高频访问的 Key(如秒杀商品、热点新闻),导致单节点负载过高。
风险:CPU 打满、网络瓶颈、单点故障。

解决方案

  1. 本地缓存(客户端缓存)

    • 在应用层缓存热 Key 数据(如 Guava、Caffeine),降低 Redis 请求频率。
    • 设置合理的本地缓存过期时间,避免数据不一致。
  2. 多级缓存

    • 结合 Redis + 本地缓存 + CDN,分散压力。例如:
      plaintext
      请求 → CDN → 本地缓存 → Redis → DB
  3. 读写分离

    • 通过 Redis 主从架构,将读请求分流到从节点(需注意主从延迟)。
    • 使用 READONLY 命令强制从节点处理读请求。
  4. Key 拆分

    • 横向拆分:将热 Key 拆分为多个子 Key,如 hot_key:1hot_key:2,通过负载均衡轮询访问。
    • 分桶策略:对热 Key 数据分桶存储,例如按用户 ID 哈希到不同子 Key。
  5. Redis 集群代理

    • 使用代理层(如 Twemproxy、Codis)将热 Key 请求分散到多个节点(需结合分片策略)。
    • 客户端分片:在应用层通过一致性哈希将请求路由到不同实例。
  6. 限流与熔断

    • 对热 Key 的访问限流(如令牌桶算法),防止突发流量击穿 Redis。
    • 熔断机制:在 Redis 压力过大时,暂时降级返回默认值。
  7. Redis 原生方案

    • Redis 6.0+ 客户端缓存:服务端支持客户端缓存(Server-assisted Client-side Caching),减少重复查询。
    • RedisCell 模块:通过 CL.THROTTLE 实现限流。

三、监控与自动化

  1. 大 Key 监控

    • 使用 redis-cli --bigkeys 定期扫描。
    • 通过 MEMORY STATS 分析内存分布。
    • 第三方工具:RedisInsight、Prometheus + Grafana。
  2. 热 Key 监控

    • Redis 命令统计:INFO commandstats 查看高频命令。
    • 使用 redis-cli --hotkeys(Redis 5.0+)识别热 Key。
    • 实时监控工具:KeyDB、阿里云 Redis 的「热 Key 分析」功能。
  3. 自动化处理

    • 通过脚本定期清理大 Key。
    • 结合监控系统触发告警(如超过阈值自动限流)。

四、总结

问题类型核心思路具体方案
大 Key拆分、压缩、异步清理分片存储、UNLINKziplist优化、集群分片
热 Key分散压力、多级缓存本地缓存、读写分离、Key 拆分、限流

关键点

  • 预防优于修复:设计阶段避免大 Key,业务层控制热点。
  • 监控常态化:通过工具实时发现隐患。
  • 结合业务场景:如秒杀场景可提前预热数据,结合本地缓存 + 限流。

Released under the MIT License.