一、并发问题
在 redigo
官方的文档描述中,Receive()
方法是不支持多并发的,原文为:
1 |
Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method. |
而 Do()
方法是间接调用了 Receive()
方法,所以 Do()
方法也是不支持多并发的。我们可以用一段代码来验证这一点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func incr(i int) { fmt.Println("Start thread", i) err := gCoon.Send("incrby", "nKey", fmt.Sprintf("%d", i)) checkErr(err) err = gCoon.Flush() checkErr(err) time.Sleep(time.Second * time.Duration(i)) rs, err := redis.Int(gCoon.Receive()) checkErr(err) fmt.Printf("Thread %d Incr success, result is %d ", i, rs) } |
这里是一个函数,完成了一个简单的 INCRBY
命令,实现 nKey + i
功能,和正常情况不同的是在 Send()
和 Flush()
后不会立刻使用 Receive()
来获取结果,而是让线程休眠一段时间再来获取。
主函数中开启两个 gorutine 运行这段代码:
1 2 |
go incr(5) go incr(1) |
运行结果:
1 2 3 4 |
Start thread 5 Start thread 1 Thread 1 Incr success, result is 5 Thread 5 Incr success, result is 6 |
可以看到,线程 5
先运行,然后线程 1
运行,由于线程 1
休眠时间短,所以它会先读取输入缓冲区的返回数据,按照预期,线程 1
读到的结果应该是 1
,因为它只是执行了 incr nKey 1 。
而实际上,它读到的却是线程 5
的结果。
从这里我们可以很明显看出这一过程是线程不安全的,即它是不支持多并发的。那么如果要实现并发应该怎么做呢,官方也提出了解决方法,使用线程安全的连接池
来解决这个问题:
1 |
For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph. |
二、连接池
redigo
中的线程池是一个对象:
1 2 3 4 5 6 7 8 |
ype Pool struct { Dial func() (Conn, error) // 连接 redis 的函数 TestOnBorrow func(c Conn, t time.Time) error // 测试空闲线程是否正常运行的函数 MaxIdle int //最大的空闲连接数,可以看作是最大连接数 MaxActive int //最大的活跃连接数,默认为 0 不限制 IdleTimeout time.Duration //连接超时时间,默认为 0 表示不做超时限制 Wait bool //当连接超出数量先之后,是否等待到空闲连接释放 } |
一个连接池创建的示例为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
func newPool(addr string) *redis.Pool { return &redis.Pool{ MaxIdle: 3, IdleTimeout: 240 * time.Second, Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) }, } } var ( pool *redis.Pool redisServer = flag.String("redisServer", ":6379", "") ) func main() { flag.Parse() pool = newPool(*redisServer) ... } |
当连接池创建完毕之后,如果需要使用连接时调用 pool.Get()
方法即可获得一个可用的连接,此时再执行 Do()
等方法时就不会被其他并发干扰,要注意的是每获取到一个可用的连接并且在使用完之后,一定要通过 conn.Close()
来主动释放连接,以方便下一个应用调用,不然该连接将会一直被占用。
同时官方提供了一个 NewPool()
来创建连接池,但是这个方法是不推荐的:
1 2 3 4 5 6 |
// NewPool creates a new pool. // // Deprecated: Initialize the Pool directory as shown in the example. func NewPool(newFn func() (Conn, error), maxIdle int) *Pool { return &Pool{Dial: newFn, MaxIdle: maxIdle} } |
三、示例代码
一个完整的示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package main import ( "github.com/garyburd/redigo/redis" "fmt" ) //连接池的连接到服务的函数 func newPoolFunc()(redis.Conn, error){ return redis.Dial("tcp", ":6379") } //生成一个连接池对象 func newPool()(* redis.Pool){ return &redis.Pool{ MaxIdle: 10, Dial: newPoolFunc, Wait: true, } } //错误检测 func checkErr(err error){ if err != nil{ panic(err) } } func main(){ pool := newPool() conn := pool.Get() defer conn.Close() rs, err := redis.Strings(conn.Do("KEYS", "*")) checkErr(err) fmt.Println(rs) // [nKey] fmt.Println(pool.ActiveCount()) // 1 conn.Close() } |
评论