Define error code for cache module
This commit is contained in:
parent
1c69214142
commit
433763fca0
2
client/cache/cache.go
vendored
2
client/cache/cache.go
vendored
@ -56,12 +56,14 @@ type Cache interface {
|
|||||||
// Set a cached value with key and expire time.
|
// Set a cached value with key and expire time.
|
||||||
Put(ctx context.Context, key string, val interface{}, timeout time.Duration) error
|
Put(ctx context.Context, key string, val interface{}, timeout time.Duration) error
|
||||||
// Delete cached value by key.
|
// Delete cached value by key.
|
||||||
|
// Should not return error if key not found
|
||||||
Delete(ctx context.Context, key string) error
|
Delete(ctx context.Context, key string) error
|
||||||
// Increment a cached int value by key, as a counter.
|
// Increment a cached int value by key, as a counter.
|
||||||
Incr(ctx context.Context, key string) error
|
Incr(ctx context.Context, key string) error
|
||||||
// Decrement a cached int value by key, as a counter.
|
// Decrement a cached int value by key, as a counter.
|
||||||
Decr(ctx context.Context, key string) error
|
Decr(ctx context.Context, key string) error
|
||||||
// Check if a cached value exists or not.
|
// Check if a cached value exists or not.
|
||||||
|
// if key is expired, return (false, nil)
|
||||||
IsExist(ctx context.Context, key string) (bool, error)
|
IsExist(ctx context.Context, key string) (bool, error)
|
||||||
// Clear all cache.
|
// Clear all cache.
|
||||||
ClearAll(ctx context.Context) error
|
ClearAll(ctx context.Context) error
|
||||||
|
|||||||
134
client/cache/error_code.go
vendored
134
client/cache/error_code.go
vendored
@ -46,3 +46,137 @@ var NotIntegerType = berror.DefineCode(4002006, moduleName, "NotIntegerType", `
|
|||||||
The type of value is not (u)int (u)int32 (u)int64.
|
The type of value is not (u)int (u)int32 (u)int64.
|
||||||
When you want to call Incr or Decr function of Cache API, you must confirm that the value's type is one of (u)int (u)int32 (u)int64.
|
When you want to call Incr or Decr function of Cache API, you must confirm that the value's type is one of (u)int (u)int32 (u)int64.
|
||||||
`)
|
`)
|
||||||
|
|
||||||
|
var InvalidFileCacheDirectoryLevelCfg = berror.DefineCode(4002007, moduleName, "InvalidFileCacheDirectoryLevelCfg", `
|
||||||
|
You pass invalid DirectoryLevel parameter when you try to StartAndGC file cache instance.
|
||||||
|
This parameter must be a integer, and please check your input.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var InvalidFileCacheEmbedExpiryCfg = berror.DefineCode(4002008, moduleName, "InvalidFileCacheEmbedExpiryCfg", `
|
||||||
|
You pass invalid EmbedExpiry parameter when you try to StartAndGC file cache instance.
|
||||||
|
This parameter must be a integer, and please check your input.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var CreateFileCacheDirFailed = berror.DefineCode(4002009, moduleName, "CreateFileCacheDirFailed", `
|
||||||
|
Beego failed to create file cache directory. There are two cases:
|
||||||
|
1. You pass invalid CachePath parameter. Please check your input.
|
||||||
|
2. Beego doesn't have the permission to create this directory. Please check your file mode.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var InvalidFileCachePath = berror.DefineCode(4002010, moduleName, "InvalidFilePath", `
|
||||||
|
The file path of FileCache is invalid. Please correct the config.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var ReadFileCacheContentFailed = berror.DefineCode(4002011, moduleName, "ReadFileCacheContentFailed", `
|
||||||
|
Usually you won't got this error. It means that Beego cannot read the data from the file.
|
||||||
|
You need to check whether the file exist. Sometimes it may be deleted by other processes.
|
||||||
|
If the file exists, please check the permission that Beego is able to read data from the file.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var InvalidGobEncodedData = berror.DefineCode(4002012, moduleName, "InvalidEncodedData", `
|
||||||
|
The data is invalid. When you try to decode the invalid data, you got this error.
|
||||||
|
Please confirm that the data is encoded by GOB correctly.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var GobEncodeDataFailed = berror.DefineCode(4002013, moduleName, "GobEncodeDataFailed", `
|
||||||
|
Beego could not encode the data to GOB byte array. In general, the data type is invalid.
|
||||||
|
For example, GOB doesn't support function type.
|
||||||
|
Basic types, string, structure, structure pointer are supported.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var KeyExpired = berror.DefineCode(4002014, moduleName, "KeyExpired", `
|
||||||
|
Cache key is expired.
|
||||||
|
You should notice that, a key is expired and then it may be deleted by GC goroutine.
|
||||||
|
So when you query a key which may be expired, you may got this code, or KeyNotExist.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var KeyNotExist = berror.DefineCode(4002015, moduleName, "KeyNotExist", `
|
||||||
|
Key not found.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var MultiGetFailed = berror.DefineCode(4002016, moduleName, "MultiGetFailed", `
|
||||||
|
Get multiple keys failed. Please check the detail msg to find out the root cause.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var InvalidMemoryCacheCfg = berror.DefineCode(4002017, moduleName, "InvalidMemoryCacheCfg", `
|
||||||
|
The config is invalid. Please check your input. It must be a json string.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var InvalidMemCacheCfg = berror.DefineCode(4002018, moduleName, "InvalidMemCacheCfg", `
|
||||||
|
The config is invalid. Please check your input, it must be json string and contains "conn" field.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var InvalidMemCacheValue = berror.DefineCode(4002019, moduleName, "InvalidMemCacheValue", `
|
||||||
|
The value must be string or byte[], please check your input.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var InvalidRedisCacheCfg = berror.DefineCode(4002020, moduleName, "InvalidRedisCacheCfg", `
|
||||||
|
The config must be json string, and has "conn" field.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var InvalidSsdbCacheCfg = berror.DefineCode(4002021, moduleName, "InvalidSsdbCacheCfg", `
|
||||||
|
The config must be json string, and has "conn" field. The value of "conn" field should be "host:port".
|
||||||
|
"port" must be a valid integer.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var InvalidSsdbCacheValue = berror.DefineCode(4002022, moduleName, "InvalidSsdbCacheValue", `
|
||||||
|
SSDB cache only accept string value. Please check your input.
|
||||||
|
`)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
var DeleteFileCacheItemFailed = berror.DefineCode(5002001, moduleName, "DeleteFileCacheItemFailed", `
|
||||||
|
Beego try to delete file cache item failed.
|
||||||
|
Please check whether Beego generated file correctly.
|
||||||
|
And then confirm whether this file is already deleted by other processes or other people.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var MemCacheCurdFailed = berror.DefineCode(5002002, moduleName, "MemCacheError", `
|
||||||
|
When you want to get, put, delete key-value from remote memcache servers, you may get error:
|
||||||
|
1. You pass invalid servers address, so Beego could not connect to remote server;
|
||||||
|
2. The servers address is correct, but there is some net issue. Typically there is some firewalls between application and memcache server;
|
||||||
|
3. Key is invalid. The key's length should be less than 250 and must not contains special characters;
|
||||||
|
4. The response from memcache server is invalid;
|
||||||
|
`)
|
||||||
|
|
||||||
|
var RedisCacheCurdFailed = berror.DefineCode(5002003, moduleName, "RedisCacheCurdFailed", `
|
||||||
|
When Beego uses client to send request to redis server, it failed.
|
||||||
|
1. The server addresses is invalid;
|
||||||
|
2. Network issue, firewall issue or network is unstable;
|
||||||
|
3. Client failed to manage connection. In extreme cases, Beego's redis client didn't maintain connections correctly, for example, Beego try to send request via closed connection;
|
||||||
|
4. The request are huge and redis server spent too much time to process it, and client is timeout;
|
||||||
|
|
||||||
|
In general, if you always got this error whatever you do, in most cases, it was caused by network issue.
|
||||||
|
You could check your network state, and confirm that firewall rules are correct.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var InvalidConnection = berror.DefineCode(5002004, moduleName, "InvalidConnection", `
|
||||||
|
The connection is invalid. Please check your connection info, network, firewall.
|
||||||
|
You could simply uses ping, telnet or write some simple tests to test network.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var DialFailed = berror.DefineCode(5002005, moduleName, "DialFailed", `
|
||||||
|
When Beego try to dial to remote servers, it failed. Please check your connection info and network state, server state.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var SsdbCacheCurdFailed = berror.DefineCode(5002006, moduleName, "SsdbCacheCurdFailed", `
|
||||||
|
When you try to use SSDB cache, it failed. There are many cases:
|
||||||
|
1. servers unavailable;
|
||||||
|
2. network issue, including network unstable, firewall;
|
||||||
|
3. connection issue;
|
||||||
|
4. request are huge and servers spent too much time to process it, got timeout;
|
||||||
|
`)
|
||||||
|
|
||||||
|
var SsdbBadResponse = berror.DefineCode(5002007, moduleName, "SsdbBadResponse", `
|
||||||
|
The reponse from SSDB server is invalid.
|
||||||
|
Usually it indicates something wrong on server side.
|
||||||
|
`)
|
||||||
|
|
||||||
|
var ErrKeyExpired = berror.Error(KeyExpired, "the key is expired")
|
||||||
|
var ErrKeyNotExist = berror.Error(KeyNotExist, "the key isn't exist")
|
||||||
108
client/cache/file.go
vendored
108
client/cache/file.go
vendored
@ -30,7 +30,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/beego/beego/v2/core/berror"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FileCacheItem is basic unit of file cache adapter which
|
// FileCacheItem is basic unit of file cache adapter which
|
||||||
@ -96,24 +96,37 @@ func (fc *FileCache) StartAndGC(config string) error {
|
|||||||
}
|
}
|
||||||
fc.CachePath = cfg[cpKey]
|
fc.CachePath = cfg[cpKey]
|
||||||
fc.FileSuffix = cfg[fsKey]
|
fc.FileSuffix = cfg[fsKey]
|
||||||
fc.DirectoryLevel, _ = strconv.Atoi(cfg[dlKey])
|
fc.DirectoryLevel, err = strconv.Atoi(cfg[dlKey])
|
||||||
fc.EmbedExpiry, _ = strconv.Atoi(cfg[eeKey])
|
if err != nil {
|
||||||
|
return berror.Wrapf(err, InvalidFileCacheDirectoryLevelCfg,
|
||||||
fc.Init()
|
"invalid directory level config, please check your input, it must be integer: %s", cfg[dlKey])
|
||||||
return nil
|
}
|
||||||
|
fc.EmbedExpiry, err = strconv.Atoi(cfg[eeKey])
|
||||||
|
if err != nil {
|
||||||
|
return berror.Wrapf(err, InvalidFileCacheEmbedExpiryCfg,
|
||||||
|
"invalid embed expiry config, please check your input, it must be integer: %s", cfg[eeKey])
|
||||||
|
}
|
||||||
|
return fc.Init()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init makes new a dir for file cache if it does not already exist
|
// Init makes new a dir for file cache if it does not already exist
|
||||||
func (fc *FileCache) Init() {
|
func (fc *FileCache) Init() error {
|
||||||
if ok, _ := exists(fc.CachePath); !ok { // todo : error handle
|
ok, err := exists(fc.CachePath)
|
||||||
_ = os.MkdirAll(fc.CachePath, os.ModePerm) // todo : error handle
|
if err != nil || ok {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
err = os.MkdirAll(fc.CachePath, os.ModePerm)
|
||||||
|
if err != nil {
|
||||||
|
return berror.Wrapf(err, CreateFileCacheDirFailed,
|
||||||
|
"could not create directory, please check the config [%s] and file mode.", fc.CachePath)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getCachedFilename returns an md5 encoded file name.
|
// getCachedFilename returns an md5 encoded file name.
|
||||||
func (fc *FileCache) getCacheFileName(key string) string {
|
func (fc *FileCache) getCacheFileName(key string) (string, error) {
|
||||||
m := md5.New()
|
m := md5.New()
|
||||||
io.WriteString(m, key)
|
_, _ = io.WriteString(m, key)
|
||||||
keyMd5 := hex.EncodeToString(m.Sum(nil))
|
keyMd5 := hex.EncodeToString(m.Sum(nil))
|
||||||
cachePath := fc.CachePath
|
cachePath := fc.CachePath
|
||||||
switch fc.DirectoryLevel {
|
switch fc.DirectoryLevel {
|
||||||
@ -122,18 +135,29 @@ func (fc *FileCache) getCacheFileName(key string) string {
|
|||||||
case 1:
|
case 1:
|
||||||
cachePath = filepath.Join(cachePath, keyMd5[0:2])
|
cachePath = filepath.Join(cachePath, keyMd5[0:2])
|
||||||
}
|
}
|
||||||
|
ok, err := exists(cachePath)
|
||||||
if ok, _ := exists(cachePath); !ok { // todo : error handle
|
if err != nil {
|
||||||
_ = os.MkdirAll(cachePath, os.ModePerm) // todo : error handle
|
return "", err
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
err = os.MkdirAll(cachePath, os.ModePerm)
|
||||||
|
if err != nil {
|
||||||
|
return "", berror.Wrapf(err, CreateFileCacheDirFailed,
|
||||||
|
"could not create the directory: %s", cachePath)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return filepath.Join(cachePath, fmt.Sprintf("%s%s", keyMd5, fc.FileSuffix))
|
return filepath.Join(cachePath, fmt.Sprintf("%s%s", keyMd5, fc.FileSuffix)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get value from file cache.
|
// Get value from file cache.
|
||||||
// if nonexistent or expired return an empty string.
|
// if nonexistent or expired return an empty string.
|
||||||
func (fc *FileCache) Get(ctx context.Context, key string) (interface{}, error) {
|
func (fc *FileCache) Get(ctx context.Context, key string) (interface{}, error) {
|
||||||
fileData, err := FileGetContents(fc.getCacheFileName(key))
|
fn, err := fc.getCacheFileName(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
fileData, err := FileGetContents(fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -145,7 +169,7 @@ func (fc *FileCache) Get(ctx context.Context, key string) (interface{}, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if to.Expired.Before(time.Now()) {
|
if to.Expired.Before(time.Now()) {
|
||||||
return nil, errors.New("The key is expired")
|
return nil, ErrKeyExpired
|
||||||
}
|
}
|
||||||
return to.Data, nil
|
return to.Data, nil
|
||||||
}
|
}
|
||||||
@ -168,7 +192,7 @@ func (fc *FileCache) GetMulti(ctx context.Context, keys []string) ([]interface{}
|
|||||||
if len(keysErr) == 0 {
|
if len(keysErr) == 0 {
|
||||||
return rc, nil
|
return rc, nil
|
||||||
}
|
}
|
||||||
return rc, errors.New(strings.Join(keysErr, "; "))
|
return rc, berror.Error(MultiGetFailed, strings.Join(keysErr, "; "))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put value into file cache.
|
// Put value into file cache.
|
||||||
@ -188,14 +212,26 @@ func (fc *FileCache) Put(ctx context.Context, key string, val interface{}, timeo
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return FilePutContents(fc.getCacheFileName(key), data)
|
|
||||||
|
fn, err := fc.getCacheFileName(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return FilePutContents(fn, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete file cache value.
|
// Delete file cache value.
|
||||||
func (fc *FileCache) Delete(ctx context.Context, key string) error {
|
func (fc *FileCache) Delete(ctx context.Context, key string) error {
|
||||||
filename := fc.getCacheFileName(key)
|
filename, err := fc.getCacheFileName(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if ok, _ := exists(filename); ok {
|
if ok, _ := exists(filename); ok {
|
||||||
return os.Remove(filename)
|
err = os.Remove(filename)
|
||||||
|
if err != nil {
|
||||||
|
return berror.Wrapf(err, DeleteFileCacheItemFailed,
|
||||||
|
"can not delete this file cache key-value, key is %s and file name is %s", key, filename)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -233,8 +269,11 @@ func (fc *FileCache) Decr(ctx context.Context, key string) error {
|
|||||||
|
|
||||||
// IsExist checks if value exists.
|
// IsExist checks if value exists.
|
||||||
func (fc *FileCache) IsExist(ctx context.Context, key string) (bool, error) {
|
func (fc *FileCache) IsExist(ctx context.Context, key string) (bool, error) {
|
||||||
ret, _ := exists(fc.getCacheFileName(key))
|
fn, err := fc.getCacheFileName(key)
|
||||||
return ret, nil
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return exists(fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClearAll cleans cached files (not implemented)
|
// ClearAll cleans cached files (not implemented)
|
||||||
@ -251,13 +290,19 @@ func exists(path string) (bool, error) {
|
|||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return false, err
|
return false, berror.Wrapf(err, InvalidFileCachePath, "file cache path is invalid: %s", path)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FileGetContents Reads bytes from a file.
|
// FileGetContents Reads bytes from a file.
|
||||||
// if non-existent, create this file.
|
// if non-existent, create this file.
|
||||||
func FileGetContents(filename string) (data []byte, e error) {
|
func FileGetContents(filename string) ([]byte, error) {
|
||||||
return ioutil.ReadFile(filename)
|
data, err := ioutil.ReadFile(filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, berror.Wrapf(err, ReadFileCacheContentFailed,
|
||||||
|
"could not read the data from the file: %s, " +
|
||||||
|
"please confirm that file exist and Beego has the permission to read the content.", filename)
|
||||||
|
}
|
||||||
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilePutContents puts bytes into a file.
|
// FilePutContents puts bytes into a file.
|
||||||
@ -272,16 +317,21 @@ func GobEncode(data interface{}) ([]byte, error) {
|
|||||||
enc := gob.NewEncoder(buf)
|
enc := gob.NewEncoder(buf)
|
||||||
err := enc.Encode(data)
|
err := enc.Encode(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, berror.Wrap(err, GobEncodeDataFailed, "could not encode this data")
|
||||||
}
|
}
|
||||||
return buf.Bytes(), err
|
return buf.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GobDecode Gob decodes a file cache item.
|
// GobDecode Gob decodes a file cache item.
|
||||||
func GobDecode(data []byte, to *FileCacheItem) error {
|
func GobDecode(data []byte, to *FileCacheItem) error {
|
||||||
buf := bytes.NewBuffer(data)
|
buf := bytes.NewBuffer(data)
|
||||||
dec := gob.NewDecoder(buf)
|
dec := gob.NewDecoder(buf)
|
||||||
return dec.Decode(&to)
|
err := dec.Decode(&to)
|
||||||
|
if err != nil {
|
||||||
|
return berror.Wrap(err, InvalidGobEncodedData,
|
||||||
|
"could not decode this data to FileCacheItem. Make sure that the data is encoded by GOB.")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
105
client/cache/file_test.go
vendored
Normal file
105
client/cache/file_test.go
vendored
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
// Copyright 2021 beego
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFileCacheStartAndGC(t *testing.T) {
|
||||||
|
fc := NewFileCache().(*FileCache)
|
||||||
|
err := fc.StartAndGC(`{`)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
err = fc.StartAndGC(`{}`)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, fc.CachePath, FileCachePath)
|
||||||
|
assert.Equal(t, fc.DirectoryLevel, FileCacheDirectoryLevel)
|
||||||
|
assert.Equal(t, fc.EmbedExpiry, int(FileCacheEmbedExpiry))
|
||||||
|
assert.Equal(t, fc.FileSuffix, FileCacheFileSuffix)
|
||||||
|
|
||||||
|
err = fc.StartAndGC(`{"CachePath":"/cache","FileSuffix":".bin","DirectoryLevel":"2","EmbedExpiry":"0"}`)
|
||||||
|
// could not create dir
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
|
||||||
|
str := getTestCacheFilePath()
|
||||||
|
err = fc.StartAndGC(fmt.Sprintf(`{"CachePath":"%s","FileSuffix":".bin","DirectoryLevel":"2","EmbedExpiry":"0"}`, str))
|
||||||
|
assert.Equal(t, fc.CachePath, str)
|
||||||
|
assert.Equal(t, fc.DirectoryLevel, 2)
|
||||||
|
assert.Equal(t, fc.EmbedExpiry, 0)
|
||||||
|
assert.Equal(t, fc.FileSuffix, ".bin")
|
||||||
|
|
||||||
|
err = fc.StartAndGC(fmt.Sprintf(`{"CachePath":"%s","FileSuffix":".bin","DirectoryLevel":"aaa","EmbedExpiry":"0"}`, str))
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
|
||||||
|
err = fc.StartAndGC(fmt.Sprintf(`{"CachePath":"%s","FileSuffix":".bin","DirectoryLevel":"2","EmbedExpiry":"aaa"}`, str))
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFileCacheInit(t *testing.T) {
|
||||||
|
fc := NewFileCache().(*FileCache)
|
||||||
|
fc.CachePath = "////aaa"
|
||||||
|
err := fc.Init()
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
fc.CachePath = getTestCacheFilePath()
|
||||||
|
err = fc.Init()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFileGetContents(t *testing.T) {
|
||||||
|
data, err := FileGetContents("/bin/aaa")
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
fn := filepath.Join(os.TempDir(), "fileCache.txt")
|
||||||
|
f, err := os.Create(fn)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
_, err = f.WriteString("text")
|
||||||
|
assert.Nil(t, err)
|
||||||
|
data, err = FileGetContents(fn)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, "text", string(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGobEncodeDecode(t *testing.T) {
|
||||||
|
data, err := GobEncode(func() {})
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
data, err = GobEncode(&FileCacheItem{
|
||||||
|
Data: "hello",
|
||||||
|
})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
err = GobDecode([]byte("wrong data"), &FileCacheItem{})
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
dci := &FileCacheItem{}
|
||||||
|
err = GobDecode(data, dci)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, "hello", dci.Data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFileCacheDelete(t *testing.T) {
|
||||||
|
fc := NewFileCache()
|
||||||
|
err := fc.StartAndGC(`{}`)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
err = fc.Delete(context.Background(), "my-key")
|
||||||
|
assert.Nil(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTestCacheFilePath() string {
|
||||||
|
return filepath.Join(os.TempDir(), "test", "file.txt")
|
||||||
|
}
|
||||||
91
client/cache/memcache/memcache.go
vendored
91
client/cache/memcache/memcache.go
vendored
@ -32,7 +32,6 @@ package memcache
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -40,6 +39,7 @@ import (
|
|||||||
"github.com/bradfitz/gomemcache/memcache"
|
"github.com/bradfitz/gomemcache/memcache"
|
||||||
|
|
||||||
"github.com/beego/beego/v2/client/cache"
|
"github.com/beego/beego/v2/client/cache"
|
||||||
|
"github.com/beego/beego/v2/core/berror"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Cache Memcache adapter.
|
// Cache Memcache adapter.
|
||||||
@ -55,30 +55,25 @@ func NewMemCache() cache.Cache {
|
|||||||
|
|
||||||
// Get get value from memcache.
|
// Get get value from memcache.
|
||||||
func (rc *Cache) Get(ctx context.Context, key string) (interface{}, error) {
|
func (rc *Cache) Get(ctx context.Context, key string) (interface{}, error) {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if item, err := rc.conn.Get(key); err == nil {
|
if item, err := rc.conn.Get(key); err == nil {
|
||||||
return item.Value, nil
|
return item.Value, nil
|
||||||
} else {
|
} else {
|
||||||
return nil, err
|
return nil, berror.Wrapf(err, cache.MemCacheCurdFailed,
|
||||||
|
"could not read data from memcache, please check your key, network and connection. Root cause: %s",
|
||||||
|
err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMulti gets a value from a key in memcache.
|
// GetMulti gets a value from a key in memcache.
|
||||||
func (rc *Cache) GetMulti(ctx context.Context, keys []string) ([]interface{}, error) {
|
func (rc *Cache) GetMulti(ctx context.Context, keys []string) ([]interface{}, error) {
|
||||||
rv := make([]interface{}, len(keys))
|
rv := make([]interface{}, len(keys))
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return rv, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mv, err := rc.conn.GetMulti(keys)
|
mv, err := rc.conn.GetMulti(keys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rv, err
|
return rv, berror.Wrapf(err, cache.MemCacheCurdFailed,
|
||||||
|
"could not read multiple key-values from memcache, " +
|
||||||
|
"please check your keys, network and connection. Root cause: %s",
|
||||||
|
err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
keysErr := make([]string, 0)
|
keysErr := make([]string, 0)
|
||||||
@ -93,78 +88,54 @@ func (rc *Cache) GetMulti(ctx context.Context, keys []string) ([]interface{}, er
|
|||||||
if len(keysErr) == 0 {
|
if len(keysErr) == 0 {
|
||||||
return rv, nil
|
return rv, nil
|
||||||
}
|
}
|
||||||
return rv, fmt.Errorf(strings.Join(keysErr, "; "))
|
return rv, berror.Error(cache.MultiGetFailed, strings.Join(keysErr, "; "))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put puts a value into memcache.
|
// Put puts a value into memcache.
|
||||||
func (rc *Cache) Put(ctx context.Context, key string, val interface{}, timeout time.Duration) error {
|
func (rc *Cache) Put(ctx context.Context, key string, val interface{}, timeout time.Duration) error {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
item := memcache.Item{Key: key, Expiration: int32(timeout / time.Second)}
|
item := memcache.Item{Key: key, Expiration: int32(timeout / time.Second)}
|
||||||
if v, ok := val.([]byte); ok {
|
if v, ok := val.([]byte); ok {
|
||||||
item.Value = v
|
item.Value = v
|
||||||
} else if str, ok := val.(string); ok {
|
} else if str, ok := val.(string); ok {
|
||||||
item.Value = []byte(str)
|
item.Value = []byte(str)
|
||||||
} else {
|
} else {
|
||||||
return errors.New("val only support string and []byte")
|
return berror.Errorf(cache.InvalidMemCacheValue,
|
||||||
|
"the value must be string or byte[]. key: %s, value:%V", key, val)
|
||||||
}
|
}
|
||||||
return rc.conn.Set(&item)
|
return berror.Wrapf(rc.conn.Set(&item), cache.MemCacheCurdFailed,
|
||||||
|
"could not put key-value to memcache, key: %s", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes a value in memcache.
|
// Delete deletes a value in memcache.
|
||||||
func (rc *Cache) Delete(ctx context.Context, key string) error {
|
func (rc *Cache) Delete(ctx context.Context, key string) error {
|
||||||
if rc.conn == nil {
|
return berror.Wrapf(rc.conn.Delete(key), cache.MemCacheCurdFailed,
|
||||||
if err := rc.connectInit(); err != nil {
|
"could not delete key-value from memcache, key: %s", key)
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rc.conn.Delete(key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Incr increases counter.
|
// Incr increases counter.
|
||||||
func (rc *Cache) Incr(ctx context.Context, key string) error {
|
func (rc *Cache) Incr(ctx context.Context, key string) error {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := rc.conn.Increment(key, 1)
|
_, err := rc.conn.Increment(key, 1)
|
||||||
return err
|
return berror.Wrapf(err, cache.MemCacheCurdFailed,
|
||||||
|
"could not increase value for key: %s", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decr decreases counter.
|
// Decr decreases counter.
|
||||||
func (rc *Cache) Decr(ctx context.Context, key string) error {
|
func (rc *Cache) Decr(ctx context.Context, key string) error {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := rc.conn.Decrement(key, 1)
|
_, err := rc.conn.Decrement(key, 1)
|
||||||
return err
|
return berror.Wrapf(err, cache.MemCacheCurdFailed,
|
||||||
|
"could not decrease value for key: %s", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsExist checks if a value exists in memcache.
|
// IsExist checks if a value exists in memcache.
|
||||||
func (rc *Cache) IsExist(ctx context.Context, key string) (bool, error) {
|
func (rc *Cache) IsExist(ctx context.Context, key string) (bool, error) {
|
||||||
if rc.conn == nil {
|
_, err := rc.Get(ctx, key)
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := rc.conn.Get(key)
|
|
||||||
return err == nil, err
|
return err == nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClearAll clears all cache in memcache.
|
// ClearAll clears all cache in memcache.
|
||||||
func (rc *Cache) ClearAll(context.Context) error {
|
func (rc *Cache) ClearAll(context.Context) error {
|
||||||
if rc.conn == nil {
|
return berror.Wrap(rc.conn.FlushAll(), cache.MemCacheCurdFailed,
|
||||||
if err := rc.connectInit(); err != nil {
|
"try to clear all key-value pairs failed")
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rc.conn.FlushAll()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartAndGC starts the memcache adapter.
|
// StartAndGC starts the memcache adapter.
|
||||||
@ -172,21 +143,15 @@ func (rc *Cache) ClearAll(context.Context) error {
|
|||||||
// If an error occurs during connecting, an error is returned
|
// If an error occurs during connecting, an error is returned
|
||||||
func (rc *Cache) StartAndGC(config string) error {
|
func (rc *Cache) StartAndGC(config string) error {
|
||||||
var cf map[string]string
|
var cf map[string]string
|
||||||
json.Unmarshal([]byte(config), &cf)
|
if err := json.Unmarshal([]byte(config), &cf); err != nil {
|
||||||
|
return berror.Wrapf(err, cache.InvalidMemCacheCfg,
|
||||||
|
"could not unmarshal this config, it must be valid json stringP: %s", config)
|
||||||
|
}
|
||||||
|
|
||||||
if _, ok := cf["conn"]; !ok {
|
if _, ok := cf["conn"]; !ok {
|
||||||
return errors.New("config has no conn key")
|
return berror.Errorf(cache.InvalidMemCacheCfg, `config must contains "conn" field: %s`, config)
|
||||||
}
|
}
|
||||||
rc.conninfo = strings.Split(cf["conn"], ";")
|
rc.conninfo = strings.Split(cf["conn"], ";")
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// connect to memcache and keep the connection.
|
|
||||||
func (rc *Cache) connectInit() error {
|
|
||||||
rc.conn = memcache.New(rc.conninfo...)
|
rc.conn = memcache.New(rc.conninfo...)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
27
client/cache/memory.go
vendored
27
client/cache/memory.go
vendored
@ -17,11 +17,12 @@ package cache
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/beego/beego/v2/core/berror"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -64,13 +65,14 @@ func NewMemoryCache() Cache {
|
|||||||
func (bc *MemoryCache) Get(ctx context.Context, key string) (interface{}, error) {
|
func (bc *MemoryCache) Get(ctx context.Context, key string) (interface{}, error) {
|
||||||
bc.RLock()
|
bc.RLock()
|
||||||
defer bc.RUnlock()
|
defer bc.RUnlock()
|
||||||
if itm, ok := bc.items[key]; ok {
|
if itm, ok :=
|
||||||
|
bc.items[key]; ok {
|
||||||
if itm.isExpire() {
|
if itm.isExpire() {
|
||||||
return nil, errors.New("the key is expired")
|
return nil, ErrKeyExpired
|
||||||
}
|
}
|
||||||
return itm.val, nil
|
return itm.val, nil
|
||||||
}
|
}
|
||||||
return nil, errors.New("the key isn't exist")
|
return nil, ErrKeyNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMulti gets caches from memory.
|
// GetMulti gets caches from memory.
|
||||||
@ -91,7 +93,7 @@ func (bc *MemoryCache) GetMulti(ctx context.Context, keys []string) ([]interface
|
|||||||
if len(keysErr) == 0 {
|
if len(keysErr) == 0 {
|
||||||
return rc, nil
|
return rc, nil
|
||||||
}
|
}
|
||||||
return rc, errors.New(strings.Join(keysErr, "; "))
|
return rc, berror.Error(MultiGetFailed, strings.Join(keysErr, "; "))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put puts cache into memory.
|
// Put puts cache into memory.
|
||||||
@ -108,16 +110,11 @@ func (bc *MemoryCache) Put(ctx context.Context, key string, val interface{}, tim
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete cache in memory.
|
// Delete cache in memory.
|
||||||
|
// If the key is not found, it will not return error
|
||||||
func (bc *MemoryCache) Delete(ctx context.Context, key string) error {
|
func (bc *MemoryCache) Delete(ctx context.Context, key string) error {
|
||||||
bc.Lock()
|
bc.Lock()
|
||||||
defer bc.Unlock()
|
defer bc.Unlock()
|
||||||
if _, ok := bc.items[key]; !ok {
|
|
||||||
return errors.New("key not exist")
|
|
||||||
}
|
|
||||||
delete(bc.items, key)
|
delete(bc.items, key)
|
||||||
if _, ok := bc.items[key]; ok {
|
|
||||||
return errors.New("delete key error")
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,7 +125,7 @@ func (bc *MemoryCache) Incr(ctx context.Context, key string) error {
|
|||||||
defer bc.Unlock()
|
defer bc.Unlock()
|
||||||
itm, ok := bc.items[key]
|
itm, ok := bc.items[key]
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("key not exist")
|
return ErrKeyNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
val, err := incr(itm.val)
|
val, err := incr(itm.val)
|
||||||
@ -145,7 +142,7 @@ func (bc *MemoryCache) Decr(ctx context.Context, key string) error {
|
|||||||
defer bc.Unlock()
|
defer bc.Unlock()
|
||||||
itm, ok := bc.items[key]
|
itm, ok := bc.items[key]
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("key not exist")
|
return ErrKeyNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
val, err := decr(itm.val)
|
val, err := decr(itm.val)
|
||||||
@ -177,7 +174,9 @@ func (bc *MemoryCache) ClearAll(context.Context) error {
|
|||||||
// StartAndGC starts memory cache. Checks expiration in every clock time.
|
// StartAndGC starts memory cache. Checks expiration in every clock time.
|
||||||
func (bc *MemoryCache) StartAndGC(config string) error {
|
func (bc *MemoryCache) StartAndGC(config string) error {
|
||||||
var cf map[string]int
|
var cf map[string]int
|
||||||
json.Unmarshal([]byte(config), &cf)
|
if err := json.Unmarshal([]byte(config), &cf); err != nil {
|
||||||
|
return berror.Wrapf(err, InvalidMemoryCacheCfg, "invalid config, please check your input: %s", config)
|
||||||
|
}
|
||||||
if _, ok := cf["interval"]; !ok {
|
if _, ok := cf["interval"]; !ok {
|
||||||
cf = make(map[string]int)
|
cf = make(map[string]int)
|
||||||
cf["interval"] = DefaultEvery
|
cf["interval"] = DefaultEvery
|
||||||
|
|||||||
59
client/cache/redis/redis.go
vendored
59
client/cache/redis/redis.go
vendored
@ -32,7 +32,6 @@ package redis
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -41,6 +40,7 @@ import (
|
|||||||
"github.com/gomodule/redigo/redis"
|
"github.com/gomodule/redigo/redis"
|
||||||
|
|
||||||
"github.com/beego/beego/v2/client/cache"
|
"github.com/beego/beego/v2/client/cache"
|
||||||
|
"github.com/beego/beego/v2/core/berror"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -67,15 +67,20 @@ func NewRedisCache() cache.Cache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Execute the redis commands. args[0] must be the key name
|
// Execute the redis commands. args[0] must be the key name
|
||||||
func (rc *Cache) do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
func (rc *Cache) do(commandName string, args ...interface{}) (interface{}, error) {
|
||||||
if len(args) < 1 {
|
|
||||||
return nil, errors.New("missing required arguments")
|
|
||||||
}
|
|
||||||
args[0] = rc.associate(args[0])
|
args[0] = rc.associate(args[0])
|
||||||
c := rc.p.Get()
|
c := rc.p.Get()
|
||||||
defer c.Close()
|
defer func() {
|
||||||
|
_ = c.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
return c.Do(commandName, args...)
|
reply, err := c.Do(commandName, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, berror.Wrapf(err, cache.RedisCacheCurdFailed,
|
||||||
|
"could not execute this command: %s", commandName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// associate with config key.
|
// associate with config key.
|
||||||
@ -95,7 +100,9 @@ func (rc *Cache) Get(ctx context.Context, key string) (interface{}, error) {
|
|||||||
// GetMulti gets cache from redis.
|
// GetMulti gets cache from redis.
|
||||||
func (rc *Cache) GetMulti(ctx context.Context, keys []string) ([]interface{}, error) {
|
func (rc *Cache) GetMulti(ctx context.Context, keys []string) ([]interface{}, error) {
|
||||||
c := rc.p.Get()
|
c := rc.p.Get()
|
||||||
defer c.Close()
|
defer func() {
|
||||||
|
_ = c.Close()
|
||||||
|
}()
|
||||||
var args []interface{}
|
var args []interface{}
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
args = append(args, rc.associate(key))
|
args = append(args, rc.associate(key))
|
||||||
@ -137,13 +144,16 @@ func (rc *Cache) Decr(ctx context.Context, key string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ClearAll deletes all cache in the redis collection
|
// ClearAll deletes all cache in the redis collection
|
||||||
|
// Be careful about this method, because it scans all keys and the delete them one by one
|
||||||
func (rc *Cache) ClearAll(context.Context) error {
|
func (rc *Cache) ClearAll(context.Context) error {
|
||||||
cachedKeys, err := rc.Scan(rc.key + ":*")
|
cachedKeys, err := rc.Scan(rc.key + ":*")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c := rc.p.Get()
|
c := rc.p.Get()
|
||||||
defer c.Close()
|
defer func() {
|
||||||
|
_ = c.Close()
|
||||||
|
}()
|
||||||
for _, str := range cachedKeys {
|
for _, str := range cachedKeys {
|
||||||
if _, err = c.Do("DEL", str); err != nil {
|
if _, err = c.Do("DEL", str); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -155,7 +165,9 @@ func (rc *Cache) ClearAll(context.Context) error {
|
|||||||
// Scan scans all keys matching a given pattern.
|
// Scan scans all keys matching a given pattern.
|
||||||
func (rc *Cache) Scan(pattern string) (keys []string, err error) {
|
func (rc *Cache) Scan(pattern string) (keys []string, err error) {
|
||||||
c := rc.p.Get()
|
c := rc.p.Get()
|
||||||
defer c.Close()
|
defer func() {
|
||||||
|
_ = c.Close()
|
||||||
|
}()
|
||||||
var (
|
var (
|
||||||
cursor uint64 = 0 // start
|
cursor uint64 = 0 // start
|
||||||
result []interface{}
|
result []interface{}
|
||||||
@ -186,13 +198,16 @@ func (rc *Cache) Scan(pattern string) (keys []string, err error) {
|
|||||||
// Cached items in redis are stored forever, no garbage collection happens
|
// Cached items in redis are stored forever, no garbage collection happens
|
||||||
func (rc *Cache) StartAndGC(config string) error {
|
func (rc *Cache) StartAndGC(config string) error {
|
||||||
var cf map[string]string
|
var cf map[string]string
|
||||||
json.Unmarshal([]byte(config), &cf)
|
err := json.Unmarshal([]byte(config), &cf)
|
||||||
|
if err != nil {
|
||||||
|
return berror.Wrapf(err, cache.InvalidRedisCacheCfg, "could not unmarshal the config: %s", config)
|
||||||
|
}
|
||||||
|
|
||||||
if _, ok := cf["key"]; !ok {
|
if _, ok := cf["key"]; !ok {
|
||||||
cf["key"] = DefaultKey
|
cf["key"] = DefaultKey
|
||||||
}
|
}
|
||||||
if _, ok := cf["conn"]; !ok {
|
if _, ok := cf["conn"]; !ok {
|
||||||
return errors.New("config has no conn key")
|
return berror.Wrapf(err, cache.InvalidRedisCacheCfg, "config missing conn field. ", config)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Format redis://<password>@<host>:<port>
|
// Format redis://<password>@<host>:<port>
|
||||||
@ -229,9 +244,16 @@ func (rc *Cache) StartAndGC(config string) error {
|
|||||||
rc.connectInit()
|
rc.connectInit()
|
||||||
|
|
||||||
c := rc.p.Get()
|
c := rc.p.Get()
|
||||||
defer c.Close()
|
defer func() {
|
||||||
|
_ = c.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
return c.Err()
|
// test connection
|
||||||
|
if err = c.Err(); err != nil {
|
||||||
|
return berror.Wrapf(err, cache.InvalidConnection,
|
||||||
|
"can not connect to remote redis server, please check the connection info and network state: %s", config)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// connect to redis.
|
// connect to redis.
|
||||||
@ -239,19 +261,20 @@ func (rc *Cache) connectInit() {
|
|||||||
dialFunc := func() (c redis.Conn, err error) {
|
dialFunc := func() (c redis.Conn, err error) {
|
||||||
c, err = redis.Dial("tcp", rc.conninfo)
|
c, err = redis.Dial("tcp", rc.conninfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, berror.Wrapf(err, cache.DialFailed,
|
||||||
|
"could not dial to remote server: %s ", rc.conninfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rc.password != "" {
|
if rc.password != "" {
|
||||||
if _, err := c.Do("AUTH", rc.password); err != nil {
|
if _, err = c.Do("AUTH", rc.password); err != nil {
|
||||||
c.Close()
|
_ = c.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, selecterr := c.Do("SELECT", rc.dbNum)
|
_, selecterr := c.Do("SELECT", rc.dbNum)
|
||||||
if selecterr != nil {
|
if selecterr != nil {
|
||||||
c.Close()
|
_ = c.Close()
|
||||||
return nil, selecterr
|
return nil, selecterr
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|||||||
109
client/cache/ssdb/ssdb.go
vendored
109
client/cache/ssdb/ssdb.go
vendored
@ -3,7 +3,6 @@ package ssdb
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -12,6 +11,7 @@ import (
|
|||||||
"github.com/ssdb/gossdb/ssdb"
|
"github.com/ssdb/gossdb/ssdb"
|
||||||
|
|
||||||
"github.com/beego/beego/v2/client/cache"
|
"github.com/beego/beego/v2/client/cache"
|
||||||
|
"github.com/beego/beego/v2/core/berror"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Cache SSDB adapter
|
// Cache SSDB adapter
|
||||||
@ -27,31 +27,21 @@ func NewSsdbCache() cache.Cache {
|
|||||||
|
|
||||||
// Get gets a key's value from memcache.
|
// Get gets a key's value from memcache.
|
||||||
func (rc *Cache) Get(ctx context.Context, key string) (interface{}, error) {
|
func (rc *Cache) Get(ctx context.Context, key string) (interface{}, error) {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
value, err := rc.conn.Get(key)
|
value, err := rc.conn.Get(key)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return value, nil
|
return value, nil
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, berror.Wrapf(err, cache.SsdbCacheCurdFailed, "could not get value, key: %s", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMulti gets one or keys values from ssdb.
|
// GetMulti gets one or keys values from ssdb.
|
||||||
func (rc *Cache) GetMulti(ctx context.Context, keys []string) ([]interface{}, error) {
|
func (rc *Cache) GetMulti(ctx context.Context, keys []string) ([]interface{}, error) {
|
||||||
size := len(keys)
|
size := len(keys)
|
||||||
values := make([]interface{}, size)
|
values := make([]interface{}, size)
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return values, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := rc.conn.Do("multi_get", keys)
|
res, err := rc.conn.Do("multi_get", keys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return values, err
|
return values, berror.Wrapf(err, cache.SsdbCacheCurdFailed, "multi_get failed, key: %v", keys)
|
||||||
}
|
}
|
||||||
|
|
||||||
resSize := len(res)
|
resSize := len(res)
|
||||||
@ -70,7 +60,7 @@ func (rc *Cache) GetMulti(ctx context.Context, keys []string) ([]interface{}, er
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(keysErr) != 0 {
|
if len(keysErr) != 0 {
|
||||||
return values, fmt.Errorf(strings.Join(keysErr, "; "))
|
return values, berror.Error(cache.MultiGetFailed, strings.Join(keysErr, "; "))
|
||||||
}
|
}
|
||||||
|
|
||||||
return values, nil
|
return values, nil
|
||||||
@ -78,26 +68,16 @@ func (rc *Cache) GetMulti(ctx context.Context, keys []string) ([]interface{}, er
|
|||||||
|
|
||||||
// DelMulti deletes one or more keys from memcache
|
// DelMulti deletes one or more keys from memcache
|
||||||
func (rc *Cache) DelMulti(keys []string) error {
|
func (rc *Cache) DelMulti(keys []string) error {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := rc.conn.Do("multi_del", keys)
|
_, err := rc.conn.Do("multi_del", keys)
|
||||||
return err
|
return berror.Wrapf(err, cache.SsdbCacheCurdFailed, "multi_del failed: %v", keys)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put puts value into memcache.
|
// Put puts value into memcache.
|
||||||
// value: must be of type string
|
// value: must be of type string
|
||||||
func (rc *Cache) Put(ctx context.Context, key string, val interface{}, timeout time.Duration) error {
|
func (rc *Cache) Put(ctx context.Context, key string, val interface{}, timeout time.Duration) error {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
v, ok := val.(string)
|
v, ok := val.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("value must string")
|
return berror.Errorf(cache.InvalidSsdbCacheValue, "value must be string: %v", val)
|
||||||
}
|
}
|
||||||
var resp []string
|
var resp []string
|
||||||
var err error
|
var err error
|
||||||
@ -108,57 +88,37 @@ func (rc *Cache) Put(ctx context.Context, key string, val interface{}, timeout t
|
|||||||
resp, err = rc.conn.Do("setx", key, v, ttl)
|
resp, err = rc.conn.Do("setx", key, v, ttl)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return berror.Wrapf(err, cache.SsdbCacheCurdFailed, "set or setx failed, key: %s", key)
|
||||||
}
|
}
|
||||||
if len(resp) == 2 && resp[0] == "ok" {
|
if len(resp) == 2 && resp[0] == "ok" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return errors.New("bad response")
|
return berror.Errorf(cache.SsdbBadResponse, "the response from SSDB server is invalid: %v", resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes a value in memcache.
|
// Delete deletes a value in memcache.
|
||||||
func (rc *Cache) Delete(ctx context.Context, key string) error {
|
func (rc *Cache) Delete(ctx context.Context, key string) error {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := rc.conn.Del(key)
|
_, err := rc.conn.Del(key)
|
||||||
return err
|
return berror.Wrapf(err, cache.SsdbCacheCurdFailed, "del failed: %s", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Incr increases a key's counter.
|
// Incr increases a key's counter.
|
||||||
func (rc *Cache) Incr(ctx context.Context, key string) error {
|
func (rc *Cache) Incr(ctx context.Context, key string) error {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := rc.conn.Do("incr", key, 1)
|
_, err := rc.conn.Do("incr", key, 1)
|
||||||
return err
|
return berror.Wrapf(err, cache.SsdbCacheCurdFailed, "increase failed: %s", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decr decrements a key's counter.
|
// Decr decrements a key's counter.
|
||||||
func (rc *Cache) Decr(ctx context.Context, key string) error {
|
func (rc *Cache) Decr(ctx context.Context, key string) error {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := rc.conn.Do("incr", key, -1)
|
_, err := rc.conn.Do("incr", key, -1)
|
||||||
return err
|
return berror.Wrapf(err, cache.SsdbCacheCurdFailed, "decrease failed: %s", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsExist checks if a key exists in memcache.
|
// IsExist checks if a key exists in memcache.
|
||||||
func (rc *Cache) IsExist(ctx context.Context, key string) (bool, error) {
|
func (rc *Cache) IsExist(ctx context.Context, key string) (bool, error) {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
resp, err := rc.conn.Do("exists", key)
|
resp, err := rc.conn.Do("exists", key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, berror.Wrapf(err, cache.SsdbCacheCurdFailed, "exists failed: %s", key)
|
||||||
}
|
}
|
||||||
if len(resp) == 2 && resp[1] == "1" {
|
if len(resp) == 2 && resp[1] == "1" {
|
||||||
return true, nil
|
return true, nil
|
||||||
@ -167,13 +127,9 @@ func (rc *Cache) IsExist(ctx context.Context, key string) (bool, error) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClearAll clears all cached items in memcache.
|
// ClearAll clears all cached items in ssdb.
|
||||||
|
// If there are many keys, this method may spent much time.
|
||||||
func (rc *Cache) ClearAll(context.Context) error {
|
func (rc *Cache) ClearAll(context.Context) error {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
keyStart, keyEnd, limit := "", "", 50
|
keyStart, keyEnd, limit := "", "", 50
|
||||||
resp, err := rc.Scan(keyStart, keyEnd, limit)
|
resp, err := rc.Scan(keyStart, keyEnd, limit)
|
||||||
for err == nil {
|
for err == nil {
|
||||||
@ -187,21 +143,16 @@ func (rc *Cache) ClearAll(context.Context) error {
|
|||||||
}
|
}
|
||||||
_, e := rc.conn.Do("multi_del", keys)
|
_, e := rc.conn.Do("multi_del", keys)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return e
|
return berror.Wrapf(e, cache.SsdbCacheCurdFailed, "multi_del failed: %v", keys)
|
||||||
}
|
}
|
||||||
keyStart = resp[size-2]
|
keyStart = resp[size-2]
|
||||||
resp, err = rc.Scan(keyStart, keyEnd, limit)
|
resp, err = rc.Scan(keyStart, keyEnd, limit)
|
||||||
}
|
}
|
||||||
return err
|
return berror.Wrap(err, cache.SsdbCacheCurdFailed, "scan failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scan key all cached in ssdb.
|
// Scan key all cached in ssdb.
|
||||||
func (rc *Cache) Scan(keyStart string, keyEnd string, limit int) ([]string, error) {
|
func (rc *Cache) Scan(keyStart string, keyEnd string, limit int) ([]string, error) {
|
||||||
if rc.conn == nil {
|
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
resp, err := rc.conn.Do("scan", keyStart, keyEnd, limit)
|
resp, err := rc.conn.Do("scan", keyStart, keyEnd, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -214,30 +165,36 @@ func (rc *Cache) Scan(keyStart string, keyEnd string, limit int) ([]string, erro
|
|||||||
// If an error occurs during connection, an error is returned
|
// If an error occurs during connection, an error is returned
|
||||||
func (rc *Cache) StartAndGC(config string) error {
|
func (rc *Cache) StartAndGC(config string) error {
|
||||||
var cf map[string]string
|
var cf map[string]string
|
||||||
json.Unmarshal([]byte(config), &cf)
|
err := json.Unmarshal([]byte(config), &cf)
|
||||||
|
if err != nil {
|
||||||
|
return berror.Wrapf(err, cache.InvalidSsdbCacheCfg,
|
||||||
|
"unmarshal this config failed, it must be a valid json string: %s", config)
|
||||||
|
}
|
||||||
if _, ok := cf["conn"]; !ok {
|
if _, ok := cf["conn"]; !ok {
|
||||||
return errors.New("config has no conn key")
|
return berror.Wrapf(err, cache.InvalidSsdbCacheCfg,
|
||||||
|
"Missing conn field: %s", config)
|
||||||
}
|
}
|
||||||
rc.conninfo = strings.Split(cf["conn"], ";")
|
rc.conninfo = strings.Split(cf["conn"], ";")
|
||||||
if rc.conn == nil {
|
return rc.connectInit()
|
||||||
if err := rc.connectInit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// connect to memcache and keep the connection.
|
// connect to memcache and keep the connection.
|
||||||
func (rc *Cache) connectInit() error {
|
func (rc *Cache) connectInit() error {
|
||||||
conninfoArray := strings.Split(rc.conninfo[0], ":")
|
conninfoArray := strings.Split(rc.conninfo[0], ":")
|
||||||
|
if len(conninfoArray) < 2 {
|
||||||
|
return berror.Errorf(cache.InvalidSsdbCacheCfg, "The value of conn should be host:port: %s", rc.conninfo[0])
|
||||||
|
}
|
||||||
host := conninfoArray[0]
|
host := conninfoArray[0]
|
||||||
port, e := strconv.Atoi(conninfoArray[1])
|
port, e := strconv.Atoi(conninfoArray[1])
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return e
|
return berror.Errorf(cache.InvalidSsdbCacheCfg, "Port is invalid. It must be integer, %s", rc.conninfo[0])
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
rc.conn, err = ssdb.Connect(host, port)
|
if rc.conn, err = ssdb.Connect(host, port); err != nil {
|
||||||
return err
|
return berror.Wrapf(err, cache.InvalidConnection,
|
||||||
|
"could not connect to SSDB, please check your connection info, network and firewall: %s", rc.conninfo[0])
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user