use connection pool for redis cache
This commit is contained in:
		
							parent
							
								
									844412c302
								
							
						
					
					
						commit
						b64e70e7df
					
				
							
								
								
									
										160
									
								
								cache/redis.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										160
									
								
								cache/redis.go
									
									
									
									
										vendored
									
									
								
							| @ -3,7 +3,8 @@ package cache | |||||||
| import ( | import ( | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"io" | 	"sync" | ||||||
|  | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/beego/redigo/redis" | 	"github.com/beego/redigo/redis" | ||||||
| ) | ) | ||||||
| @ -15,9 +16,10 @@ var ( | |||||||
| 
 | 
 | ||||||
| // Redis cache adapter. | // Redis cache adapter. | ||||||
| type RedisCache struct { | type RedisCache struct { | ||||||
| 	c        redis.Conn | 	p        *redis.Pool // redis connection pool | ||||||
| 	conninfo string | 	conninfo string | ||||||
| 	key      string | 	key      string | ||||||
|  | 	mu       sync.Mutex | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // create new redis cache with default collection name. | // create new redis cache with default collection name. | ||||||
| @ -25,23 +27,17 @@ func NewRedisCache() *RedisCache { | |||||||
| 	return &RedisCache{key: DefaultKey} | 	return &RedisCache{key: DefaultKey} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // actually do the redis cmds | ||||||
|  | func (rc *RedisCache) do(commandName string, args ...interface{}) (reply interface{}, err error) { | ||||||
|  | 	c := rc.p.Get() | ||||||
|  | 	defer c.Close() | ||||||
|  | 
 | ||||||
|  | 	return c.Do(commandName, args...) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Get cache from redis. | // Get cache from redis. | ||||||
| func (rc *RedisCache) Get(key string) interface{} { | func (rc *RedisCache) Get(key string) interface{} { | ||||||
| 	if rc.c == nil { | 	v, err := rc.do("HGET", rc.key, key) | ||||||
| 		var err error |  | ||||||
| 		rc.c, err = rc.connectInit() |  | ||||||
| 		if err != nil { |  | ||||||
| 			return nil |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	v, err := rc.c.Do("HGET", rc.key, key) |  | ||||||
| 	// write to closed socket, reset rc.c to nil |  | ||||||
| 	if err == io.EOF { |  | ||||||
| 		rc.c = nil |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| @ -52,61 +48,19 @@ func (rc *RedisCache) Get(key string) interface{} { | |||||||
| // put cache to redis. | // put cache to redis. | ||||||
| // timeout is ignored. | // timeout is ignored. | ||||||
| func (rc *RedisCache) Put(key string, val interface{}, timeout int64) error { | func (rc *RedisCache) Put(key string, val interface{}, timeout int64) error { | ||||||
| 	if rc.c == nil { | 	_, err := rc.do("HSET", rc.key, key, val) | ||||||
| 		var err error |  | ||||||
| 		rc.c, err = rc.connectInit() |  | ||||||
| 		if err != nil { |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	_, err := rc.c.Do("HSET", rc.key, key, val) |  | ||||||
| 	// write to closed socket, reset rc.c to nil |  | ||||||
| 	if err == io.EOF { |  | ||||||
| 		rc.c = nil |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // delete cache in redis. | // delete cache in redis. | ||||||
| func (rc *RedisCache) Delete(key string) error { | func (rc *RedisCache) Delete(key string) error { | ||||||
| 	if rc.c == nil { | 	_, err := rc.do("HDEL", rc.key, key) | ||||||
| 		var err error |  | ||||||
| 		rc.c, err = rc.connectInit() |  | ||||||
| 		if err != nil { |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	_, err := rc.c.Do("HDEL", rc.key, key) |  | ||||||
| 	// write to closed socket, reset rc.c to nil |  | ||||||
| 	if err == io.EOF { |  | ||||||
| 		rc.c = nil |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // check cache exist in redis. | // check cache exist in redis. | ||||||
| func (rc *RedisCache) IsExist(key string) bool { | func (rc *RedisCache) IsExist(key string) bool { | ||||||
| 	if rc.c == nil { | 	v, err := redis.Bool(rc.do("HEXISTS", rc.key, key)) | ||||||
| 		var err error |  | ||||||
| 		rc.c, err = rc.connectInit() |  | ||||||
| 		if err != nil { |  | ||||||
| 			return false |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	v, err := redis.Bool(rc.c.Do("HEXISTS", rc.key, key)) |  | ||||||
| 	// write to closed socket, reset rc.c to nil |  | ||||||
| 	if err == io.EOF { |  | ||||||
| 		rc.c = nil |  | ||||||
| 		return false |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return false | 		return false | ||||||
| 	} | 	} | ||||||
| @ -116,59 +70,19 @@ func (rc *RedisCache) IsExist(key string) bool { | |||||||
| 
 | 
 | ||||||
| // increase counter in redis. | // increase counter in redis. | ||||||
| func (rc *RedisCache) Incr(key string) error { | func (rc *RedisCache) Incr(key string) error { | ||||||
| 	if rc.c == nil { | 	_, err := redis.Bool(rc.do("HINCRBY", rc.key, key, 1)) | ||||||
| 		var err error |  | ||||||
| 		rc.c, err = rc.connectInit() |  | ||||||
| 		if err != nil { |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	_, err := redis.Bool(rc.c.Do("HINCRBY", rc.key, key, 1)) |  | ||||||
| 	// write to closed socket |  | ||||||
| 	if err == io.EOF { |  | ||||||
| 		rc.c = nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // decrease counter in redis. | // decrease counter in redis. | ||||||
| func (rc *RedisCache) Decr(key string) error { | func (rc *RedisCache) Decr(key string) error { | ||||||
| 	if rc.c == nil { | 	_, err := redis.Bool(rc.do("HINCRBY", rc.key, key, -1)) | ||||||
| 		var err error |  | ||||||
| 		rc.c, err = rc.connectInit() |  | ||||||
| 		if err != nil { |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	_, err := redis.Bool(rc.c.Do("HINCRBY", rc.key, key, -1)) |  | ||||||
| 
 |  | ||||||
| 	// write to closed socket |  | ||||||
| 	if err == io.EOF { |  | ||||||
| 		rc.c = nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // clean all cache in redis. delete this redis collection. | // clean all cache in redis. delete this redis collection. | ||||||
| func (rc *RedisCache) ClearAll() error { | func (rc *RedisCache) ClearAll() error { | ||||||
| 	if rc.c == nil { | 	_, err := rc.do("DEL", rc.key) | ||||||
| 		var err error |  | ||||||
| 		rc.c, err = rc.connectInit() |  | ||||||
| 		if err != nil { |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	_, err := rc.c.Do("DEL", rc.key) |  | ||||||
| 	// write to closed socket |  | ||||||
| 	if err == io.EOF { |  | ||||||
| 		rc.c = nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -179,32 +93,46 @@ func (rc *RedisCache) ClearAll() error { | |||||||
| func (rc *RedisCache) StartAndGC(config string) error { | func (rc *RedisCache) StartAndGC(config string) error { | ||||||
| 	var cf map[string]string | 	var cf map[string]string | ||||||
| 	json.Unmarshal([]byte(config), &cf) | 	json.Unmarshal([]byte(config), &cf) | ||||||
|  | 
 | ||||||
| 	if _, ok := cf["key"]; !ok { | 	if _, ok := cf["key"]; !ok { | ||||||
| 		cf["key"] = DefaultKey | 		cf["key"] = DefaultKey | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	if _, ok := cf["conn"]; !ok { | 	if _, ok := cf["conn"]; !ok { | ||||||
| 		return errors.New("config has no conn key") | 		return errors.New("config has no conn key") | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	rc.key = cf["key"] | 	rc.key = cf["key"] | ||||||
| 	rc.conninfo = cf["conn"] | 	rc.conninfo = cf["conn"] | ||||||
| 	var err error | 	rc.connectInit() | ||||||
| 	rc.c, err = rc.connectInit() | 
 | ||||||
| 	if err != nil { | 	c := rc.p.Get() | ||||||
|  | 	defer c.Close() | ||||||
|  | 	if err := c.Err(); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	if rc.c == nil { | 
 | ||||||
| 		return errors.New("dial tcp conn error") |  | ||||||
| 	} |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // connect to redis. | // connect to redis. | ||||||
| func (rc *RedisCache) connectInit() (redis.Conn, error) { | func (rc *RedisCache) connectInit() { | ||||||
| 	c, err := redis.Dial("tcp", rc.conninfo) | 	rc.mu.Lock() | ||||||
| 	if err != nil { | 
 | ||||||
| 		return nil, err | 	// initialize a new pool | ||||||
|  | 	rc.p = &redis.Pool{ | ||||||
|  | 		MaxIdle:     3, | ||||||
|  | 		IdleTimeout: 180 * time.Second, | ||||||
|  | 		Dial: func() (redis.Conn, error) { | ||||||
|  | 			c, err := redis.Dial("tcp", rc.conninfo) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return nil, err | ||||||
|  | 			} | ||||||
|  | 			return c, nil | ||||||
|  | 		}, | ||||||
| 	} | 	} | ||||||
| 	return c, nil | 
 | ||||||
|  | 	rc.mu.Unlock() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func init() { | func init() { | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user