17-Go操作redis

Redis介绍

Redis是一个开源的内存数据库,Redis提供了多种不同类型的数据结构,很多业务场景下的问题都可以很自然地映射到这些数据结构上。

除此之外,通过复制、持久化和客户端分片等特性,我们可以很方便地将Redis扩展成一个能够包含数百GB数据、每秒处理上百万次请求的系统。

1.作缓存
2.作数据库,高并发场景直接用内存数据库

遇见新技术怎么学
1.先快速启动,用起来,解决手里工作问题
2.再遇见其他场景,又用到该技术,再去逐步学
3.千万别用到xx技术,就从安装到基础理论,学一遍,时间精力根本不允许,只会导致你放弃。

Redis支持的数据结构

Redis支持诸如字符串(string)、哈希(hashe)、列表(list)、集合(set)、带范围查询的排序集合(sorted set)、bitmap、hyperloglog、带半径查询的地理空间索引(geospatial index、外卖员、附近饭店)和流(stream)等数据结构。

Redis应用场景

  • 缓存系统,减轻主数据库(MySQL)的压力。
  • 计数场景,比如微博、抖音中的关注数和粉丝数。
  • 热门排行榜,需要排序的场景特别适合使用ZSET。
  • 利用 LIST 可以实现队列的功能。
  • 利用 HyperLogLog 统计UV、PV等数据。
  • 使用 geospatial index 进行地理位置相关查询。

准备Redis环境

读者可以选择在本机安装 redis 或使用云数据库,这里直接使用Docker启动一个 redis 环境,方便学习使用。

使用下面的命令启动一个名为 redis507 的 5.0.7 版本的 redis server环境。

docker run --name redis507 -p 6379:6379 -d redis:5.0.7

注意:此处的版本、容器名和端口号可以根据自己需要设置。

启动一个 redis-cli 连接上面的 redis server。

docker run -it --network host --rm redis:5.0.7 redis-cli

go-redis库

安装

Go 社区中目前有很多成熟的 redis client 库,比如[https://github.com/gomodule/redigohttps://github.com/go-redis/redis,读者可以自行选择适合自己的库。

推荐使用 go-redis 这个库来操作 Redis 数据库。

使用以下命令下安装 go-redis 库。

go get github.com/go-redis/redis/v8

image-20230131175939747

连接语法

普通连接模式

go-redis 库中使用 redis.NewClient 函数连接 Redis 服务器。

rdb := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379",
    Password: "", // 密码
    DB:       0,  // 数据库
    PoolSize: 20, // 连接池大小
})

除此之外,还可以使用 redis.ParseURL 函数从表示数据源的字符串中解析得到 Redis 服务器的配置信息。

opt, err := redis.ParseURL("redis://<user>:<pass>@localhost:6379/<db>")
if err != nil {
    panic(err)
}

rdb := redis.NewClient(opt)

TLS连接模式

如果使用的是 TLS 连接方式,则需要使用 tls.Config 配置。

rdb := redis.NewClient(&redis.Options{
    TLSConfig: &tls.Config{
        MinVersion: tls.VersionTLS12,
        // Certificates: []tls.Certificate{cert},
    // ServerName: "your.domain.com",
    },
})

Redis Sentinel模式

使用下面的命令连接到由 Redis Sentinel 管理的 Redis 服务器。

rdb := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:    "master-name",
    SentinelAddrs: []string{":9126", ":9127", ":9128"},
})

Redis Cluster模式

使用下面的命令连接到 Redis Cluster,go-redis 支持按延迟或随机路由命令。

rdb := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},

    // 若要根据延迟或随机路由命令,请启用以下命令之一
    // RouteByLatency: true,
    // RouteRandomly: true,
})

基本使用

连接redis

下面的示例代码演示了 go-redis 库的基本使用。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8" // 1.你用的包版本,v8开始需要传入ctx
    "time"
)

// redis demo

var (
    rdb *redis.Client // 声明一个全局的redis连接对象
)

// initClient 初始化连接
func initClient() (err error) {
    // 此处应该是初始化全局的redis连接对象
    rdb = redis.NewClient(&redis.Options{
        Addr: "yuchaoit.cn:6379",
        // 下面那都是默认值
        Password: "yuchao", // no password set
        DB:       0,        // use default DB

        PoolSize: 100, // 连接池大小
    })
    //ctx用于协程的数据传递,上下文传递,以及协程结束
    //引入ctx后,客户端主动引入超时机制,500毫秒没连上,业务就是有问题的
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    //测试是否连接的通
    //为何需要加入ctx,v8之前版本不用ctx,rdb.Ping需要等待服务端的超时故障

    _, err = rdb.Ping(ctx).Result()
    return err
}

func main() {
    // 初始化全局的redis连接对象rdb
    if err := initClient(); err != nil {
        fmt.Println("init redis client failed, err:", err)
        return
    }
    fmt.Println("连接redis成功啦!请开始使用。。")

    // 使用全局的redis连接对象执行命令,key超时50秒
    rdb.Set(context.Background(), "name", "于超", time.Second*50)

}

执行redis命令

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8" // 1.你用的包版本,v8开始需要传入ctx
    "time"
)

// redis demo

var (
    rdb *redis.Client // 声明一个全局的redis连接对象
)

// initClient 初始化连接
func initClient() (err error) {
    // 此处应该是初始化全局的redis连接对象
    //注意rdb使用的是全局rdb对象,别重新声明,否则经典空指针错误就来了
    rdb = redis.NewClient(&redis.Options{
        Addr: "yuchaoit.cn:6379",
        // 下面那都是默认值
        Password: "yuchao", // no password set
        DB:       0,        // use default DB

        PoolSize: 100, // 连接池大小
    })
    //ctx用于协程的数据传递,上下文传递,以及协程结束
    //引入ctx后,客户端主动引入超时机制,500毫秒没连上,业务就是有问题的
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    //测试是否连接的通
    //为何需要加入ctx,v8之前版本不用ctx,rdb.Ping需要等待服务端的超时故障

    _, err = rdb.Ping(ctx).Result()
    return err
}

// redis基础命令
func demo1() {
    //一般主动设置ctx 超时机制,连接redis的超时
    //一般引入外部请求传过来的ctx
    ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
    defer cancel()

    //拿到命令执行结果
    //只拿result时做错误判断
    //v, err := rdb.Set(ctx, "bj_name", "旭辉奥都", time.Second*50).Result()

    //直接拿到错误
    //只设置key时做判断
    err := rdb.Set(ctx, "bj_name", "旭辉奥都", time.Second*50).Err()
    fmt.Println(err)
}

func main() {
    // 初始化全局的redis连接对象rdb
    if err := initClient(); err != nil {
        fmt.Println("init redis client failed, err:", err)
        return
    }
    fmt.Println("连接redis成功啦!请开始使用。。")

    // 使用全局的redis连接对象执行命令,key超时50秒
    //rdb.Set(context.Background(), "name", "于超", time.Second*50)

    demo1()

}

/*
[yuc-tx-2 root ~]#redis-cli -a yuchao  --raw  get bj_name
旭辉奥都
*/

rdb.Do

go-redis 还提供了一个执行任意命令或自定义命令的 Do 方法,特别是一些 go-redis 库暂时不支持的命令都可以使用该方法执行。具体使用方法如下。


func doDemo() {
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    // 直接执行命令获取错误,执行任意redis语法的命令
    err := rdb.Do(ctx, "set", "addr", "北京", "EX", 3600).Err()
    fmt.Println(err)

    // 执行命令获取结果
    val, err := rdb.Do(ctx, "get", "addr").Result()
    fmt.Println(val, err)
}

redis.Nil

go-redis 库提供了一个 redis.Nil 错误来表示 Key 不存在的错误。

1.redis命令执行失败

2.redis数据没查到,redis.Nil 数据为空。

因此在使用 go-redis 时需要注意对返回错误的判断。

在某些场景下我们应该区别处理 redis.Nil 和其他不为 nil 的错误。


// getValueFromRedis redis.Nil判断
func getValueFromRedis(key, defaultValue string) (string, error) {
    //客户端主动超时设置 500毫秒
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    val, err := rdb.Get(ctx, key).Result()
    if err != nil {
        // 如果返回的错误是key不存在,就别panic了,而是一个简单错误,可以返回默认值等
        if errors.Is(err, redis.Nil) {
            return defaultValue, nil
        }
        // 出其他错了
        return "", err
    }
    return val, nil
}

func main() {
    // 初始化全局的redis连接对象rdb
    if err := initClient(); err != nil {
        fmt.Println("init redis client failed, err:", err)
        return
    }
    fmt.Println("连接redis成功啦!请开始使用。。")

    // 使用全局的redis连接对象执行命令,key超时50秒
    //rdb.Set(context.Background(), "name", "于超", time.Second*50)

    //demo1()
    //doDemo()
    //res, err := getValueFromRedis("addrr", "key不存在的默认值")
    res, err := getValueFromRedis("addr", "key不存在的默认值")
    fmt.Println(res, err)
}

几种返回值与错误


func manyVal() {
    fmt.Println("如下都是redis查不到数据的错误↓↓")
    fmt.Printf("rdb.Get()执行结果Err() 等于 redis.Nil吗?\t---%#v\n", rdb.Get(context.Background(), "yuyu").Err() == redis.Nil)
    fmt.Printf("rdb.Get()执行结果Err() 等于 nil吗?\t---%#v\n", rdb.Get(context.Background(), "yuyu").Err() == nil)
    fmt.Printf("rdb.Get()执行结果Val() 是什么:\t---%#v\n", rdb.Get(context.Background(), "yuyu").Val())
    v1, err1 := rdb.Get(context.Background(), "yuyu").Result()
    fmt.Printf("rdb.Get()执行结果Result() 是什么,2个返回值:\t---%#v %T\n", v1, err1)

}

/*
➜  goStudy  go run demo.go
连接redis成功啦!请开始使用。。
如下都是redis查不到数据的错误↓↓
rdb.Get()执行结果Err() 等于 redis.Nil吗?       ---true
rdb.Get()执行结果Err() 等于 nil吗?     ---false
rdb.Get()执行结果Val() 是什么: ---""
rdb.Get()执行结果Result() 是什么,2个返回值:   ---"" proto.RedisError
程序结束

*/

zset示例

有序集合,排行榜功能,有排分功能。

下面的示例代码演示了如何使用 go-redis 库操作 zset。

其实本质都还是对应redis基础操作的命令。

key命名常见规则

"app:keyName"

代码


// zsetDemo 操作zset示例
func zsetDemo() {
    // key
    zsetKey := "language_rank"
    // value
    languages := []*redis.Z{
        {Score: 90.0, Member: "Golang"},
        {Score: 98.0, Member: "Java"},
        {Score: 95.0, Member: "Python"},
        {Score: 97.0, Member: "JavaScript"},
        {Score: 99.0, Member: "C/C++"},
    }
    //明显的,go新版本,需要ctx,控制goroutine,协程之间的信息传递
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    // ZADD
    err := rdb.ZAdd(ctx, zsetKey, languages...).Err()
    if err != nil {
        fmt.Printf("zadd failed, err:%v\n", err)
        return
    }
    fmt.Println("zadd success")

    // 把Golang的分数加10,给某个成员,分数+10
    //获取执行结果
    newScore, err := rdb.ZIncrBy(ctx, zsetKey, 10.0, "Golang").Result()
    if err != nil {
        fmt.Printf("zincrby failed, err:%v\n", err)
        return
    }
    fmt.Printf("Golang's score is %f now.\n", newScore)

    // 取分数最高的3个,索引0,1,2
    //val()拿到成员切片
    ret := rdb.ZRevRangeWithScores(ctx, zsetKey, 0, 2).Val()
    for _, z := range ret {
        fmt.Println(z.Member, z.Score)
    }

    // 取95~100分的
    op := &redis.ZRangeBy{
        Min: "95",
        Max: "100",
    }
    ret, err = rdb.ZRangeByScoreWithScores(ctx, zsetKey, op).Result()
    if err != nil {
        fmt.Printf("zrangebyscore failed, err:%v\n", err)
        return
    }
    for _, z := range ret {
        fmt.Println(z.Member, z.Score)
    }
}

Hash示例


// 哈希类型,Redis hash 是一个 string 类型的 field(字段) 和 value(值) 的映射表,hash 特别适合用于存储对象。
func hsetDemo() {
    //创建hash key,设置字段
    err := rdb.HSetNX(context.Background(), "yuchao", "score", 60).Err()
    println(err)
    err = rdb.HSetNX(context.Background(), "yuchao", "high", "183cm").Err()
    println(err)
    //取hash中一个键值对
    v, err := rdb.HGet(context.Background(), "yuchao", "score").Result()
    fmt.Println(v, err)
    //获取多个filed,注意返回值变量v,在go中,是否需要重新赋值!!
    v2, err := rdb.HMGet(context.Background(), "yuchao", "score", "high").Result()
    fmt.Println(v2, err)

}

scan示例

你可以使用KEYS prefix:* 命令按前缀获取所有 key。

vals, err := rdb.Keys(ctx, "prefix*").Result()

但是如果需要扫描数百万的 key ,那速度就会比较慢,且危险。

这种场景下你可以使用Scan 命令来遍历所有符合要求的 key。


// 批量写入key
var wg sync.WaitGroup

func manyKey(i int) {
    defer wg.Done()
    v, err := rdb.Set(context.Background(), fmt.Sprintf("yu%d", i), i, 0).Result()
    log.Println(v, err)
}

// scanKeysDemo1 按前缀查找所有key示例
func scanKeysDemo1() {
    ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
    defer cancel()

    var cursor uint64
    for {
        var keys []string
        var err error
        // 按前缀扫描key
        keys, cursor, err = rdb.Scan(ctx, cursor, "y*", 0).Result()
        if err != nil {
            panic(err)
        }

        for _, key := range keys {
            fmt.Println("key", key)
        }

        if cursor == 0 { // no more keys
            break
            fmt.Println("done")
        }
    }
}

func main() {
    // 初始化全局的redis连接对象rdb
    if err := initClient(); err != nil {
        fmt.Println("init redis client failed, err:", err)
        return
    }
    fmt.Println("连接redis成功啦!请开始使用。。")

    // 使用全局的redis连接对象执行命令,key超时50秒
    //rdb.Set(context.Background(), "name", "于超", time.Second*50)

    //demo1()
    //doDemo()
    //res, err := getValueFromRedis("addrr", "key不存在的默认值")
    //res, err := getValueFromRedis("addr", "key不存在的默认值")
    //fmt.Println(res, err)

    //zsetDemo()
    //hsetDemo()
    //keysDemo()
    //for i := 0; i < 5000; i++ {
    //    wg.Add(1)
    //    go manyKey(i)
    //}
    //wg.Wait()

    scanKeysDemo1()
    fmt.Println("程序结束")

}

简写

// scanKeysDemo2 按前缀扫描key示例
func scanKeysDemo2() {
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()
    // 按前缀扫描key
    iter := rdb.Scan(ctx, 0, "prefix:*", 0).Iterator()
    for iter.Next(ctx) {
        fmt.Println("keys", iter.Val())
    }
    if err := iter.Err(); err != nil {
        panic(err)
    }
}

批量del

例如,我们可以写出一个将所有匹配指定模式的 key 删除的示例。

// delKeysByMatch 按match格式扫描所有key并删除
func delKeysByMatch(match string, timeout time.Duration) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    iter := rdb.Scan(ctx, 0, match, 0).Iterator()
    for iter.Next(ctx) {
        err := rdb.Del(ctx, iter.Val()).Err()
        if err != nil {
            panic(err)
        }
    }
    if err := iter.Err(); err != nil {
        panic(err)
    }
}

此外,对于 Redis 中的 set、hash、zset 数据类型,go-redis 也支持类似的遍历方法。

iter := rdb.SScan(ctx, "set-key", 0, "prefix:*", 0).Iterator()
iter := rdb.HScan(ctx, "hash-key", 0, "prefix:*", 0).Iterator()
iter := rdb.ZScan(ctx, "sorted-hash-key", 0, "prefix:*", 0).Iterator(

pipeline

Redis Pipeline 允许通过使用单个 client-server-client 往返执行多个命令来提高性能。

区别于一个接一个地执行100个命令,你可以将这些命令放入 pipeline 中,然后使用1次读写操作像执行单个命令一样执行它们。

这样做的好处是节省了执行命令的网络往返时间(RTT)。

在下面的示例代码中演示了使用 pipeline 通过一个 write + read 操作来执行多个命令。


// pipeline练习
func pipelineDemo() {
    pipe := rdb.Pipeline()
    incr := pipe.Incr(context.Background(), "chaochao")
    res1 := pipe.Get(context.Background(), "yu999")

    //此时pipe积攒了2个命令,incr,和get,一起执行

    //等到pipe.Exec时,统一执行
    v, err := pipe.Exec(context.Background())
    fmt.Println(v) //v是获取批量执行的结果

    //获取结果
    fmt.Println(incr.Val(), err) //每次自增1
    fmt.Printf("%#v %T\n", res1.Val(), res1.Val())
    //有些方法是有多个返回值,无法直接格式化打印
    //字符串转int
    v1, err := res1.Int()
    fmt.Printf("%T %#v\n", v1, v1)

}

循环取值pipeline

// pipelineDemo2 获取pipeline结果的第二种方式
// 适合批量执行相同类型的命令(返回值类型一致)
func pipelineDemo2() {
    pipe := rdb.Pipeline()
    // 输入命令,利用pipeline组合多个命令,统一作为整体,一次性发送,一次性返回
    for i := 1; i < 10; i++ {
        pipe.Get(context.Background(), fmt.Sprintf("key%d", i))
    }
    // 执行命令
    cmders, err := pipe.Exec(context.Background())
    if err != nil && err != redis.Nil {
        fmt.Println(err)
        return
    }
    // 取结果
    for _, cmder := range cmders {
        v := cmder.String()
        // 因为拿到的cmder是一个接口类型
        // 需要自己根据上面的命令的返回值进行类型断言
        switch cmder.(type) {
        case *redis.StringCmd:
        case *redis.IntCmd:

            // case *redis.StringSliceCmd:
            //     // ...

        }
        // cmder.(*redis.StringCmd).Result()
        // cmder.(*redis.StringSliceCmd).Result()
        fmt.Println(v)
    }
    // 方式2

}

事务

常见是mysql支持,redis事务用的不多,这里是说明redis支持事务。

一个纯go后端工程师,必须精通go语法,可以快速、熟练阅读go代码。

张三的rdb.Set()
李四的rdb.Set()
依然会交替修改key

需求是开启事务,限制任务执行时,其他协程不允许操作。

Redis 是单线程执行命令的,因此单个命令始终是原子的,但是来自不同客户端的两个给定命令可以依次执行,例如在它们之间交替执行。

但是,Multi/exec能够确保在multi/exec两个语句之间的命令之间没有其他客户端正在执行命令。

在这种场景我们需要使用 TxPipeline 或 TxPipelined 方法将 pipeline 命令使用 MULTIEXEC包裹起来。

MULTI
INCR pipeline_counter
EXPIRE pipeline_counts 3600
EXEC

示例代码

// TxPipeline demo
pipe := rdb.TxPipeline()
incr := pipe.Incr(ctx, "tx_pipeline_counter")
pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
_, err := pipe.Exec(ctx)
fmt.Println(incr.Val(), err)

// TxPipelined demo
var incr2 *redis.IntCmd
_, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
    incr2 = pipe.Incr(ctx, "tx_pipeline_counter")
    pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
    return nil
})
fmt.Println(incr2.Val(), err)

Watch

我们通常搭配 WATCH命令来执行事务操作。从使用WATCH命令监视某个 key 开始,直到执行EXEC命令的这段时间里,如果有其他用户抢先对被监视的 key 进行了替换、更新、删除等操作,那么当用户尝试执行EXEC的时候,事务将失败并返回一个错误,用户可以根据这个错误选择重试事务或者放弃事务。

Watch方法接收一个函数和一个或多个key作为参数。

Watch(fn func(*Tx) error, keys ...string) error

下面的代码片段演示了 Watch 方法搭配 TxPipelined 的使用示例。

// watchDemo 在key值不变的情况下将其值+1
func watchDemo(ctx context.Context, key string) error {
    return rdb.Watch(ctx, func(tx *redis.Tx) error {
        n, err := tx.Get(ctx, key).Int()
        if err != nil && err != redis.Nil {
            return err
        }
        // 假设操作耗时5秒
        // 5秒内我们通过其他的客户端修改key,当前事务就会失败
        time.Sleep(5 * time.Second)
        _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
            pipe.Set(ctx, key, n+1, time.Hour)
            return nil
        })
        return err
    }, key)
}

将上面的函数执行并打印其返回值,如果我们在程序运行后的5秒内修改了被 watch 的 key 的值,那么该事务操作失败,返回redis: transaction failed错误。

最后我们来看一个 go-redis 官方文档中使用 GETSETWATCH命令实现一个 INCR 命令的完整示例。

这里很巧妙的,用匿名函数赋值给变量,函数内是不允许再用func关键字的。

const routineCount = 100

increment := func(key string) error {
    txf := func(tx *redis.Tx) error {
        // 获得当前值或零值
        n, err := tx.Get(key).Int()
        if err != nil && err != redis.Nil {
            return err
        }

        // 实际操作(乐观锁定中的本地操作)
        n++

        // 仅在监视的Key保持不变的情况下运行
        _, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
            // pipe 处理错误情况
            pipe.Set(key, n, 0)
            return nil
        })
        return err
    }

    for retries := routineCount; retries > 0; retries-- {
        err := rdb.Watch(txf, key)
        if err != redis.TxFailedErr {
            return err
        }
        // 乐观锁丢失
    }
    return errors.New("increment reached maximum number of retries")
}

var wg sync.WaitGroup
wg.Add(routineCount)
for i := 0; i < routineCount; i++ {
    go func() {
        defer wg.Done()

        if err := increment("counter3"); err != nil {
            fmt.Println("increment error:", err)
        }
    }()
}
wg.Wait()

n, err := rdb.Get("counter3").Int()
fmt.Println("ended with", n, err)

在这个示例中使用了 redis.TxFailedErr 来检查事务是否失败。

更多详情请查阅文档

Copyright © www.yuchaoit.cn 2025 all right reserved,powered by Gitbook作者:于超 2023-02-03 19:01:45

results matching ""

    No results matching ""