一、并发问题
在 redigo
官方的文档描述中,Receive()
方法是不支持多并发的,原文为:
Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods . No other concurrency is supported including concurrent calls to the Do method .
而 Do()
方法是间接调用了 Receive()
方法,所以 Do()
方法也是不支持多并发的。我们可以用一段代码来验证这一点:
func incr ( i int ) {
fmt . Println ( "Start thread" , i )
err : = gCoon . Send ( "incrby" , "nKey" , fmt . Sprintf ( "%d" , i ) )
checkErr ( err )
err = gCoon . Flush ( )
checkErr ( err )
time . Sleep ( time . Second * time . Duration ( i ) )
rs , err : = redis . Int ( gCoon . Receive ( ) )
checkErr ( err )
fmt . Printf ( "Thread %d Incr success, result is %d
" , i , rs )
}
这里是一个函数,完成了一个简单的 INCRBY
命令,实现 nKey + i
功能,和正常情况不同的是在 Send()
和 Flush()
后不会立刻使用 Receive()
来获取结果,而是让线程休眠一段时间再来获取。
主函数中开启两个 gorutine 运行这段代码:
运行结果:
Start thread 5
Start thread 1
Thread 1 Incr success , result is 5
Thread 5 Incr success , result is 6
可以看到,线程 5
先运行,然后线程 1
运行,由于线程 1
休眠时间短,所以它会先读取输入缓冲区的返回数据,按照预期,线程 1
读到的结果应该是 1
,因为它只是执行了 incr nKey 1 。
而实际上,它读到的却是线程 5
的结果。
从这里我们可以很明显看出这一过程是线程不安全的,即它是不支持多并发的。那么如果要实现并发应该怎么做呢,官方也提出了解决方法,使用线程安全的连接池
来解决这个问题:
For full concurrent access to Redis , use the thread - safe Pool to get , use and release a connection from within a goroutine . Connections returned from a Pool have the concurrency restrictions described in the previous paragraph .
二、连接池
redigo
中的线程池是一个对象:
ype Pool struct {
Dial func ( ) ( Conn , error ) // 连接 redis 的函数
TestOnBorrow func ( c Conn , t time . Time ) error // 测试空闲线程是否正常运行的函数
MaxIdle int //最大的空闲连接数,可以看作是最大连接数
MaxActive int //最大的活跃连接数,默认为 0 不限制
IdleTimeout time . Duration //连接超时时间,默认为 0 表示不做超时限制
Wait bool //当连接超出数量先之后,是否等待到空闲连接释放
}
一个连接池创建的示例为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func newPool ( addr string ) * redis . Pool {
return & redis . Pool {
MaxIdle : 3 ,
IdleTimeout : 240 * time . Second ,
Dial : func ( ) ( redis . Conn , error ) { return redis . Dial ( "tcp" , addr ) } ,
}
}
var (
pool * redis . Pool
redisServer = flag . String ( "redisServer" , ":6379" , "" )
)
func main ( ) {
flag . Parse ( )
pool = newPool ( * redisServer )
. . .
}
当连接池创建完毕之后,如果需要使用连接时调用 pool.Get()
方法即可获得一个可用的连接,此时再执行 Do()
等方法时就不会被其他并发干扰,要注意的是每获取到一个可用的连接并且在使用完之后,一定要通过 conn.Close()
来主动释放连接,以方便下一个应用调用,不然该连接将会一直被占用。
同时官方提供了一个 NewPool()
来创建连接池,但是这个方法是不推荐的:
// NewPool creates a new pool.
//
// Deprecated: Initialize the Pool directory as shown in the example.
func NewPool ( newFn func ( ) ( Conn , error ) , maxIdle int ) * Pool {
return & Pool { Dial : newFn , MaxIdle : maxIdle }
}
三、示例代码
一个完整的示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main
import (
"github.com/garyburd/redigo/redis"
"fmt"
)
//连接池的连接到服务的函数
func newPoolFunc ( ) ( redis . Conn , error ) {
return redis . Dial ( "tcp" , ":6379" )
}
//生成一个连接池对象
func newPool ( ) ( * redis . Pool ) {
return & redis . Pool {
MaxIdle : 10 ,
Dial : newPoolFunc ,
Wait : true ,
}
}
//错误检测
func checkErr ( err error ) {
if err != nil {
panic ( err )
}
}
func main ( ) {
pool : = newPool ( )
conn : = pool . Get ( )
defer conn . Close ( )
rs , err : = redis . Strings ( conn . Do ( "KEYS" , "*" ) )
checkErr ( err )
fmt . Println ( rs ) // [nKey]
fmt . Println ( pool . ActiveCount ( ) ) // 1
conn . Close ( )
}
评论