Redis
Redis的基本工作原理
Redis(Remote Dictionary Server,远程字典服务)是一个开源的、高性能的键值存储数据库,常用于缓存、消息队列和数据存储等场景。
Redis的数据从内存读取,增量数据保存到AOF文件,全量数据保存到RDB文件。在读写内存时,会把写命令追加到AOF文件,RDB文件保存Redis中所有的数据。当Redis启动时,根据RDB文件恢复数据到内存,再根据AOF文件把末尾几条未写入RDB的日志恢复数据。
Redis案例应用
连续签到(String)
1.签到之后签到天数加一 2.连续(0点之前签到,否则归零)
// addContinuesDays 为用户签到续期
func addContinuesDays(ctx context.Context, userID int64) {
key := fmt.Sprintf(continuesCheckKey, userID)
// 1. 连续签到数+1
err := RedisClient.Incr(ctx, key).Err()
if err != nil {
fmt.Errorf("用户[%d]连续签到失败", userID)
} else {
expAt := beginningOfDay().Add(48 * time.Hour)
// 2. 设置签到记录在后天的0点到期
if err := RedisClient.ExpireAt(ctx, key, expAt).Err(); err != nil {
panic(err)
} else {
// 3. 打印用户续签后的连续签到天数
day, err := getUserCheckInDays(ctx, userID)
if err != nil {
panic(err)
}
fmt.Printf("用户[%d]连续签到:%d(天), 过期时间:%s", userID, day, expAt.Format("2006-01-02 15:04:05"))
}
}
}
// getUserCheckInDays 获取用户连续签到天数
func getUserCheckInDays(ctx context.Context, userID int64) (int64, error) {
key := fmt.Sprintf(continuesCheckKey, userID)
days, err := RedisClient.Get(ctx, key).Result()
if err != nil {
return 0, err
}
if daysInt, err := strconv.ParseInt(days, 10, 64); err != nil {
panic(err)
} else {
return daysInt, nil
}
}
// beginningOfDay 获取今天0点时间
func beginningOfDay() time.Time {
now := time.Now()
y, m, d := now.Date()
return time.Date(y, m, d, 0, 0, 0, 0, time.Local)
}
用到的数据结构:String
String 是 Redis 中最基本的数据类型,可以存储任意类型的数据,如文本、数字、图片或序列化的对象等。一个 String 类型的键最大可以存储 512MB 的数据
Redis 的 String 类型底层实现是 SDS(Simple Dynamic String,简单动态字符串)。SDS 由长度、空闲空间和字节数组三部分组成
应用场景:存储计数,Session
消息通知(List)
用List作为消息队列,例如当文章更新时,将更新后的文章推送到ES,用户就能搜索到最新的文章数据。
package example
import (
"context"
"fmt"
"strings"
"time"
"gitee.com/wedone/redis_course/example/common"
)
const ex04ListenList = "ex04_list_0" // lpush ex04_list_0 AA BB
// Ex04Params Ex04的自定义函数
type Ex04Params struct {
}
func Ex04(ctx context.Context) {
eventLogger := &common.ConcurrentEventLogger{}
// new一个并发执行器
// routineNums是消费端的数量,多消费的场景,可以使用ex04ConsumerPop,使用ex04ConsumerRange存在消息重复消费的问题。
cInst := common.NewConcurrentRoutine(1, eventLogger)
// 并发执行用户自定义函数work
cInst.Run(ctx, Ex04Params{}, ex04ConsumerPop)
// 按日志时间正序打印日志
eventLogger.PrintLogs()
}
// ex04ConsumerPop 使用rpop逐条消费队列中的信息,数据从队列中移除
// 生成端使用:lpush ex04_list_0 AA BB
func ex04ConsumerPop(ctx context.Context, cInstParam common.CInstParams) {
routine := cInstParam.Routine
for {
items, err := RedisClient.BRPop(ctx, 0, ex04ListenList).Result()
if err != nil {
panic(err)
}
fmt.Println(common.LogFormat(routine, "读取文章[%s]标题、正文,发送到ES更新索引", items[1]))
// 将文章内容推送到ES
time.Sleep(1 * time.Second)
}
}
// ex04ConsumerRange 使用lrange批量消费队列中的数据,数据保留在队列中
// 生成端使用:rpush ex04_list_0 AA BB
// 消费端:
// 方法1 lrange ex04_list_0 -3 -1 // 从FIFO队尾中一次消费3条信息
// 方法2 rpop ex04_list_0 3
func ex04ConsumerRange(ctx context.Context, cInstParam common.CInstParams) {
routine := cInstParam.Routine
consumeBatchSize := int64(3) // 一次取N个消息
for {
// 从index(-consumeBatchSize)开始取,直到最后一个元素index(-1)
items, err := RedisClient.LRange(ctx, ex04ListenList, -consumeBatchSize, -1).Result()
if err != nil {
panic(err)
}
if len(items) > 0 {
fmt.Println(common.LogFormat(routine, "收到信息:%s", strings.Join(items, "->")))
// 清除已消费的队列
// 方法1 使用LTrim
// 保留从index(0)开始到index(-(consumeBatchSize + 1))的部分,即为未消费的部分
// RedisClient.LTrim(ctx, ex04ListenList, 0, -(consumeBatchSize + 1))
// 方法2 使用RPop
RedisClient.RPopCount(ctx, ex04ListenList, int(consumeBatchSize))
}
time.Sleep(3 * time.Second)
}
}
QuickList由双向链表和listpack实现,Redis为了压缩内存,把多个数据存到一个节点里,也就是listpack。listpack头部会放长度以及元素个数,每个元素占的长度一样长。
计数(Hash)
当有多项计数需求时,可以用Hash数据结构。
const Ex05UserCountKey = "ex05_user_count"
func Ex05(ctx context.Context, args []string) {
if len(args) == 0 {
fmt.Printf("args can NOT be empty\n")
os.Exit(1)
}
arg1 := args[0]
switch arg1 {
case "init":
Ex06InitUserCounter(ctx)
case "get":
userID, err := strconv.ParseInt(args[1], 10, 64)
if err != nil {
panic(err)
}
GetUserCounter(ctx, userID)
case "incr_like":
userID, err := strconv.ParseInt(args[1], 10, 64)
if err != nil {
panic(err)
}
IncrByUserLike(ctx, userID)
case "incr_collect":
userID, err := strconv.ParseInt(args[1], 10, 64)
if err != nil {
panic(err)
}
IncrByUserCollect(ctx, userID)
case "decr_like":
userID, err := strconv.ParseInt(args[1], 10, 64)
if err != nil {
panic(err)
}
DecrByUserLike(ctx, userID)
case "decr_collect":
userID, err := strconv.ParseInt(args[1], 10, 64)
if err != nil {
panic(err)
}
DecrByUserCollect(ctx, userID)
}
}
func Ex06InitUserCounter(ctx context.Context) {
pipe := RedisClient.Pipeline()
userCounters := []map[string]interface{}{
{"user_id": "1556564194374926", "got_digg_count": 10693, "got_view_count": 2238438, "followee_count": 176, "follower_count": 9895, "follow_collect_set_count": 0, "subscribe_tag_count": 95},
{"user_id": "1111", "got_digg_count": 19, "got_view_count": 4},
{"user_id": "2222", "got_digg_count": 1238, "follower_count": 379},
}
for _, counter := range userCounters {
uid, err := strconv.ParseInt(counter["user_id"].(string), 10, 64)
key := GetUserCounterKey(uid)
rw, err := pipe.Del(ctx, key).Result()
if err != nil {
fmt.Printf("del %s, rw=%d\n", key, rw)
}
_, err = pipe.HMSet(ctx, key, counter).Result()
if err != nil {
panic(err)
}
fmt.Printf("设置 uid=%d, key=%s\n", uid, key)
}
// 批量执行上面for循环设置好的hmset命令
_, err := pipe.Exec(ctx)
if err != nil { // 报错后进行一次额外尝试
_, err = pipe.Exec(ctx)
if err != nil {
panic(err)
}
}
}
func GetUserCounterKey(userID int64) string {
return fmt.Sprintf("%s_%d", Ex05UserCountKey, userID)
}
func GetUserCounter(ctx context.Context, userID int64) {
pipe := RedisClient.Pipeline()
GetUserCounterKey(userID)
pipe.HGetAll(ctx, GetUserCounterKey(userID))
cmders, err := pipe.Exec(ctx)
if err != nil {
panic(err)
}
for _, cmder := range cmders {
counterMap, err := cmder.(*redis.MapStringStringCmd).Result()
if err != nil {
panic(err)
}
for field, value := range counterMap {
fmt.Printf("%s: %s\n", field, value)
}
}
}
// IncrByUserLike 点赞数+1
func IncrByUserLike(ctx context.Context, userID int64) {
incrByUserField(ctx, userID, "got_digg_count")
}
// IncrByUserCollect 收藏数+1
func IncrByUserCollect(ctx context.Context, userID int64) {
incrByUserField(ctx, userID, "follow_collect_set_count")
}
// DecrByUserLike 点赞数-1
func DecrByUserLike(ctx context.Context, userID int64) {
decrByUserField(ctx, userID, "got_digg_count")
}
// DecrByUserCollect 收藏数-1
func DecrByUserCollect(ctx context.Context, userID int64) {
decrByUserField(ctx, userID, "follow_collect_set_count")
}
func incrByUserField(ctx context.Context, userID int64, field string) {
change(ctx, userID, field, 1)
}
func decrByUserField(ctx context.Context, userID int64, field string) {
change(ctx, userID, field, -1)
}
func change(ctx context.Context, userID int64, field string, incr int64) {
redisKey := GetUserCounterKey(userID)
before, err := RedisClient.HGet(ctx, redisKey, field).Result()
if err != nil {
panic(err)
}
beforeInt, err := strconv.ParseInt(before, 10, 64)
if err != nil {
panic(err)
}
if beforeInt+incr < 0 {
fmt.Printf("禁止变更计数,计数变更后小于0. %d + (%d) = %d\n", beforeInt, incr, beforeInt+incr)
return
}
fmt.Printf("user_id: %d\n更新前\n%s = %s\n--------\n", userID, field, before)
_, err = RedisClient.HIncrBy(ctx, redisKey, field, incr).Result()
if err != nil {
panic(err)
}
// fmt.Printf("更新记录[%d]:%d\n", userID, num)
count, err := RedisClient.HGet(ctx, redisKey, field).Result()
if err != nil {
panic(err)
}
fmt.Printf("user_id: %d\n更新后\n%s = %s\n--------\n", userID, field, count)
}
使用Hash就好比点外卖,把一顿饭直接打包过来,而不是一道菜一道菜送过来。
Redis中的rehash机制
- 哈希表结构
Redis的字典(dict
)使用哈希表(dictht
)作为其基本存储结构。每个字典包含两个哈希表,用于实现渐进式rehash。哈希表的结构定义如下:
- 单个节点:
dictEntry
,包含键(key
)、值(v
)和指向下一个哈希表节点的指针(next
)。 - 哈希表:
dictht
,包含哈希表数组(table
)、哈希表大小(size
)、大小掩码(sizemask
)和已使用的节点数(used
)。 - 字典结构体:
dict
,包含两个哈希表(ht[0]
和ht[1]
)、类型特定函数(type
)、私有数据(privdata
)、rehash索引(rehashidx
)和迭代器数量(iterators
)。
- 渐进式哈希扩容(rehash)
Redis采用渐进式rehash机制,避免一次性完成rehash操作导致的性能瓶颈。具体过程如下:
初始化rehash:
rehashidx
被设置为0
,表示rehash开始。- 为
ht[1]
分配新的空间,大小通常是ht[0]
的两倍。
逐步迁移:
- 每次执行哈希表操作(如
SET
、GET
、DEL
等)时,Redis会检查当前是否有需要迁移的元素。 - 如果有迁移任务,Redis会将一部分元素从
ht[0]
迁移到ht[1]
,并更新rehashidx
。 - 迁移过程中,
ht[0]
和ht[1]
同时存在,直到所有元素迁移完成。
- 每次执行哈希表操作(如
完成rehash:
- 当所有元素迁移到
ht[1]
后,ht[0]
被释放,ht[1]
成为新的哈希表。 rehashidx
被设置为-1
,表示rehash完成。
- 当所有元素迁移到
扩容条件
Redis字典的扩容发生在以下条件下:
- 负载因子 超过设定阈值(默认为1),即字典中的元素数量超过了字典容量的负载因子。
- Redis会将哈希表的容量扩大为原来的2倍,以降低负载因子,提高查找效率。
为什么需要rehash?
当哈希表中的元素数量增加到一定程度时,哈希表的负载因子(load factor)会超过一个预设的阈值(默认为1)。负载因子是哈希表中已使用的槽位数与总槽位数的比值。当负载因子过高时,哈希冲突的概率会显著增加,导致查找性能下降。为了维持哈希表的高效性能,Redis会触发rehash
操作,将哈希表的大小扩大为原来的两倍。这样可以降低负载因子,减少哈希冲突,提高查找效率
因为不能在迁移数据时阻塞用户读取数据,所以就每次访问顺便迁移一点数据,把迁移平摊到每一次的用户访问
排行榜(ZSet)
package example
import (
"context"
"fmt"
"strconv"
"time"
"github.com/go-redis/redis/v9"
)
const Ex06RankKey = "ex06_rank_zset"
type Ex06ItemScore struct {
ItemNam string
Score float64
}
// Ex06 排行榜
// go run main.go init // 初始化积分
// go run main.go Ex06 rev_order // 输出完整榜单
// go run main.go Ex06 order_page 0 // 逆序分页输出,offset=1
// go run main.go Ex06 get_rank user2 // 获取user2的排名
// go run main.go Ex06 get_score user2 // 获取user2的分数
// go run main.go Ex06 add_user_score user2 10 // 为user2设置为10分
// zadd ex06_rank_zset 15 andy
// zincrby ex06_rank_zset -9 andy // andy 扣9分,排名掉到最后一名
func Ex06(ctx context.Context, args []string) {
arg1 := args[0]
switch arg1 {
case "init":
Ex06InitUserScore(ctx)
case "rev_order":
GetRevOrderAllList(ctx, 0, -1)
case "order_page":
pageSize := int64(2)
if len(args[1]) > 0 {
offset, err := strconv.ParseInt(args[1], 10, 64)
if err != nil {
panic(err)
}
GetOrderListByPage(ctx, offset, pageSize)
}
case "get_rank":
GetUserRankByName(ctx, args[1])
case "get_score":
GetUserScoreByName(ctx, args[1])
case "add_user_score":
if len(args) < 3 {
fmt.Printf("参数错误,可能是缺少需要增加的分值。eg:go run main.go Ex06 add_user_score user2 10\n")
return
}
score, err := strconv.ParseFloat(args[2], 64)
if err != nil {
panic(err)
}
AddUserScore(ctx, args[1], score)
}
return
}
func Ex06InitUserScore(ctx context.Context) {
initList := []redis.Z{
{Member: "user1", Score: 10}, {Member: "user2", Score: 232}, {Member: "user3", Score: 129},
{Member: "user4", Score: 232},
}
// 清空榜单
if err := RedisClient.Del(ctx, Ex06RankKey).Err(); err != nil {
panic(err)
}
nums, err := RedisClient.ZAdd(ctx, Ex06RankKey, initList...).Result()
if err != nil {
panic(err)
}
fmt.Printf("初始化榜单Item数量:%d\n", nums)
}
// 榜单逆序输出
// ZRANGE ex06_rank_zset +inf -inf BYSCORE rev WITHSCORES
// 正序输出
// ZRANGE ex06_rank_zset 0 -1 WITHSCORES
func GetRevOrderAllList(ctx context.Context, limit, offset int64) {
resList, err := RedisClient.ZRevRangeWithScores(ctx, Ex06RankKey, 0, -1).Result()
if err != nil {
panic(err)
}
fmt.Printf("\n榜单:\n")
for i, z := range resList {
fmt.Printf("第%d名 %s\t%f\n", i+1, z.Member, z.Score)
}
}
func GetOrderListByPage(ctx context.Context, offset, pageSize int64) {
// zrange ex06_rank_zset 300 0 byscore rev limit 1 2 withscores // 取300分到0分之间的排名
// zrange ex06_rank_zset -inf +inf byscore withscores 正序输出
// ZRANGE ex06_rank_zset +inf -inf BYSCORE REV WITHSCORES 逆序输出所有排名
// zrange ex06_rank_zset +inf -inf byscore rev limit 0 2 withscores 逆序分页输出排名
zRangeArgs := redis.ZRangeArgs{
Key: Ex06RankKey,
ByScore: true,
Rev: true,
Start: "-inf",
Stop: "+inf",
Offset: offset,
Count: pageSize,
}
resList, err := RedisClient.ZRangeArgsWithScores(ctx, zRangeArgs).Result()
if err != nil {
panic(err)
}
fmt.Printf("\n榜单(offest=%d, pageSize=%d):\n", offset, pageSize)
offNum := int(pageSize * offset)
for i, z := range resList {
rank := i + 1 + offNum
fmt.Printf("第%d名 %s\t%f\n", rank, z.Member, z.Score)
}
fmt.Println()
}
// GetUserRankByName 获取用户排名
func GetUserRankByName(ctx context.Context, name string) {
rank, err := RedisClient.ZRevRank(ctx, Ex06RankKey, name).Result()
if err != nil {
fmt.Errorf("error getting name=%s, err=%v", name, err)
return
}
fmt.Printf("name=%s, 排名=%d\n", name, rank+1)
}
// GetUserScoreByName 获取用户分值
func GetUserScoreByName(ctx context.Context, name string) {
score, err := RedisClient.ZScore(ctx, Ex06RankKey, name).Result()
if err != nil {
fmt.Errorf("error getting name=%s, err=%v", name, err)
return
}
fmt.Println(time.Now().UnixMilli())
fmt.Printf("name=%s, 分数=%f\n", name, score)
}
// AddUserScore 排名用户
func AddUserScore(ctx context.Context, name string, score float64) {
num, err := RedisClient.ZIncrBy(ctx, Ex06RankKey, score, name).Result()
if err != nil {
panic(err)
}
fmt.Printf("name=%s, add_score=%f, score=%f\n", name, score, num)
}
Redis的ZSet数据结构
ZSet(有序集合)是Redis中的一种数据结构,它结合了集合和有序的特点。ZSet中的每个成员都关联一个分数(score),Redis会根据分数对成员进行排序。ZSet中的成员是唯一的,但分数可以重复。ZSet非常适合用于实现排行榜、实时分析等功能。
ZSet的底层实现使用了两个数据结构:
- 跳表(Skip List):
- 跳表是一种有序的数据结构,支持快速的插入、删除和查找操作。ZSet中的成员按照分数的大小存储在跳表中,支持范围查询和排序。
- 哈希表(Hash Table):
- 哈希表用于存储成员和分数的映射关系,支持快速的成员查找和分数更新。哈希表中的键是成员,值是分数。
这种结合了跳表和哈希表的实现方式,使得ZSet在插入、删除和查找操作上具有高效的性能
Redis大key、热key问题
Redis 的大 Key(Big Key)和热 Key(Hot Key)是常见的性能瓶颈问题,需要针对性优化。以下是具体解决方案:
一、大 Key(Big Key)问题
定义:大 Key 是指单个 Key 存储的数据量过大(如超过 10KB 的 String、元素过万的 Hash/List、或体积过大的复杂结构)。
风险:内存不均、操作阻塞(如 DEL
卡顿)、网络带宽压力、持久化延迟。
解决方案
拆分 Key
- 分片存储:将大 Key 按规则拆分为多个子 Key。
- 示例:一个 Hash 存储用户信息,按用户 ID 分片为
user_info:{user_id % 10}
。
- 示例:一个 Hash 存储用户信息,按用户 ID 分片为
- 分段读取:使用
HSCAN
、SSCAN
等命令分批操作,避免一次性加载全部数据。
- 分片存储:将大 Key 按规则拆分为多个子 Key。
选择高效数据结构
- 避免大 String 存储 JSON 数据,改用 Hash 结构(字段级读写)。
- 用
ziplist
优化的 Hash/List(需配置hash-max-ziplist-entries
和hash-max-ziplist-value
)。 - 超大数据集考虑使用 Redis Module(如 RedisJSON 或 RedisSearch)。
数据压缩
- 对 String 类型的 Value 使用压缩算法(如 Gzip、Snappy),但需权衡 CPU 开销。
- 二进制数据可序列化为高效格式(如 Protocol Buffers)。
异步删除
- 使用
UNLINK
替代DEL
命令(Redis 4.0+),非阻塞删除大 Key。 - 通过
Lazy Free
机制(配置lazyfree-lazy-user-del yes
)实现后台清理。
- 使用
分片集群
- 使用 Redis Cluster 将大 Key 分布到不同节点,分散内存和计算压力。
预防与监控
- 通过
redis-cli --bigkeys
扫描大 Key。 - 使用
MEMORY USAGE key
查看 Key 内存占用。 - 定期清理过期数据,设置合理的 TTL。
- 通过
区分冷热数据
例如榜单列表场景使用ZSet,只缓存前10页数据,后续数据走d'b
二、热 Key(Hot Key)问题
定义:热 Key 是指被高频访问的 Key(如秒杀商品、热点新闻),导致单节点负载过高。
风险:CPU 打满、网络瓶颈、单点故障。
解决方案
本地缓存(客户端缓存)
- 在应用层缓存热 Key 数据(如 Guava、Caffeine),降低 Redis 请求频率。
- 设置合理的本地缓存过期时间,避免数据不一致。
多级缓存
- 结合 Redis + 本地缓存 + CDN,分散压力。例如:plaintext
请求 → CDN → 本地缓存 → Redis → DB
- 结合 Redis + 本地缓存 + CDN,分散压力。例如:
读写分离
- 通过 Redis 主从架构,将读请求分流到从节点(需注意主从延迟)。
- 使用
READONLY
命令强制从节点处理读请求。
Key 拆分
- 横向拆分:将热 Key 拆分为多个子 Key,如
hot_key:1
、hot_key:2
,通过负载均衡轮询访问。 - 分桶策略:对热 Key 数据分桶存储,例如按用户 ID 哈希到不同子 Key。
- 横向拆分:将热 Key 拆分为多个子 Key,如
Redis 集群代理
- 使用代理层(如 Twemproxy、Codis)将热 Key 请求分散到多个节点(需结合分片策略)。
- 客户端分片:在应用层通过一致性哈希将请求路由到不同实例。
限流与熔断
- 对热 Key 的访问限流(如令牌桶算法),防止突发流量击穿 Redis。
- 熔断机制:在 Redis 压力过大时,暂时降级返回默认值。
Redis 原生方案
- Redis 6.0+ 客户端缓存:服务端支持客户端缓存(Server-assisted Client-side Caching),减少重复查询。
- RedisCell 模块:通过
CL.THROTTLE
实现限流。
三、监控与自动化
大 Key 监控
- 使用
redis-cli --bigkeys
定期扫描。 - 通过
MEMORY STATS
分析内存分布。 - 第三方工具:RedisInsight、Prometheus + Grafana。
- 使用
热 Key 监控
- Redis 命令统计:
INFO commandstats
查看高频命令。 - 使用
redis-cli --hotkeys
(Redis 5.0+)识别热 Key。 - 实时监控工具:KeyDB、阿里云 Redis 的「热 Key 分析」功能。
- Redis 命令统计:
自动化处理
- 通过脚本定期清理大 Key。
- 结合监控系统触发告警(如超过阈值自动限流)。
四、总结
问题类型 | 核心思路 | 具体方案 |
---|---|---|
大 Key | 拆分、压缩、异步清理 | 分片存储、UNLINK 、ziplist 优化、集群分片 |
热 Key | 分散压力、多级缓存 | 本地缓存、读写分离、Key 拆分、限流 |
关键点:
- 预防优于修复:设计阶段避免大 Key,业务层控制热点。
- 监控常态化:通过工具实时发现隐患。
- 结合业务场景:如秒杀场景可提前预热数据,结合本地缓存 + 限流。