feature: add write-double-delete cache mode (#5263)
This commit is contained in:
parent
6f803ec9a9
commit
ec884b96b1
@ -2,6 +2,7 @@
|
||||
- [httplib: fix unstable unit test which use the httplib.org](https://github.com/beego/beego/pull/5232)
|
||||
- [rft: remove adapter package](https://github.com/beego/beego/pull/5239)
|
||||
- [feat: add write-delete cache mode](https://github.com/beego/beego/pull/5242)
|
||||
- [feat: add write-double-delete cache mode](https://github.com/beego/beego/pull/5243)
|
||||
- [fix 5255: Check the rows.Err() if rows.Next() is false](https://github.com/beego/beego/pull/5256)
|
||||
- [orm: missing handling %COL% placeholder](https://github.com/beego/beego/pull/5257)
|
||||
- [fix: use of ioutil package](https://github.com/beego/beego/pull/5261)
|
||||
|
||||
4
client/cache/error_code.go
vendored
4
client/cache/error_code.go
vendored
@ -189,6 +189,10 @@ The response from SSDB server is invalid.
|
||||
Usually it indicates something wrong on server side.
|
||||
`)
|
||||
|
||||
var DeleteFailed = berror.DefineCode(5002008, moduleName, "DeleteFailed", `
|
||||
Beego attempt to delete cache item failed. Please check if the target key is correct.
|
||||
`)
|
||||
|
||||
var (
|
||||
ErrKeyExpired = berror.Error(KeyExpired, "the key is expired")
|
||||
ErrKeyNotExist = berror.Error(KeyNotExist, "the key isn't exist")
|
||||
|
||||
46
client/cache/write_delete.go
vendored
46
client/cache/write_delete.go
vendored
@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/beego/beego/v2/core/berror"
|
||||
)
|
||||
@ -48,3 +49,48 @@ func (w *WriteDeleteCache) Set(ctx context.Context, key string, val any) error {
|
||||
}
|
||||
return w.Cache.Delete(ctx, key)
|
||||
}
|
||||
|
||||
// WriteDoubleDeleteCache creates write double delete cache pattern decorator.
|
||||
// The fn is the function that persistent the key and val.
|
||||
// it will delete the key from cache when you call Set function, and wait for interval, it will delete the key from cache one more time.
|
||||
// This pattern help to reduce the possibility of data inconsistencies, but it's still possible to be inconsistent among database and cache.
|
||||
type WriteDoubleDeleteCache struct {
|
||||
Cache
|
||||
interval time.Duration
|
||||
timeout time.Duration
|
||||
storeFunc func(ctx context.Context, key string, val any) error
|
||||
}
|
||||
|
||||
type WriteDoubleDeleteCacheOption func(c *WriteDoubleDeleteCache)
|
||||
|
||||
func NewWriteDoubleDeleteCache(cache Cache, interval, timeout time.Duration,
|
||||
fn func(ctx context.Context, key string, val any) error) (*WriteDoubleDeleteCache, error) {
|
||||
if fn == nil || cache == nil {
|
||||
return nil, berror.Error(InvalidInitParameters, "cache or storeFunc can not be nil")
|
||||
}
|
||||
|
||||
return &WriteDoubleDeleteCache{
|
||||
Cache: cache,
|
||||
interval: interval,
|
||||
timeout: timeout,
|
||||
storeFunc: fn,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *WriteDoubleDeleteCache) Set(
|
||||
ctx context.Context, key string, val any) error {
|
||||
err := c.storeFunc(ctx, key, val)
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
return berror.Wrap(err, PersistCacheFailed, fmt.Sprintf("key: %s, val: %v", key, val))
|
||||
}
|
||||
time.AfterFunc(c.interval, func() {
|
||||
rCtx, cancel := context.WithTimeout(context.Background(), c.timeout)
|
||||
_ = c.Cache.Delete(rCtx, key)
|
||||
cancel()
|
||||
})
|
||||
err = c.Cache.Delete(ctx, key)
|
||||
if err != nil {
|
||||
return berror.Wrap(err, DeleteFailed, fmt.Sprintf("key: %s", key))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
189
client/cache/write_delete_test.go
vendored
189
client/cache/write_delete_test.go
vendored
@ -22,11 +22,200 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/beego/beego/v2/core/berror"
|
||||
)
|
||||
|
||||
func TestWriteDoubleDeleteCache_Set(t *testing.T) {
|
||||
mockDbStore := make(map[string]any)
|
||||
|
||||
cancels := make([]func(), 0)
|
||||
defer func() {
|
||||
for _, cancel := range cancels {
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
timeout := time.Second * 3
|
||||
testCases := []struct {
|
||||
name string
|
||||
cache Cache
|
||||
storeFunc func(ctx context.Context, key string, val any) error
|
||||
ctx context.Context
|
||||
interval time.Duration
|
||||
sleepSecond time.Duration
|
||||
key string
|
||||
value any
|
||||
wantErr error
|
||||
}{
|
||||
{
|
||||
name: "store key/value in db fail",
|
||||
interval: time.Second,
|
||||
cache: NewMemoryCache(),
|
||||
storeFunc: func(ctx context.Context, key string, val any) error {
|
||||
return errors.New("failed")
|
||||
},
|
||||
ctx: context.TODO(),
|
||||
wantErr: berror.Wrap(errors.New("failed"), PersistCacheFailed,
|
||||
fmt.Sprintf("key: %s, val: %v", "", nil)),
|
||||
},
|
||||
{
|
||||
name: "store key/value success",
|
||||
interval: time.Second * 2,
|
||||
sleepSecond: time.Second * 3,
|
||||
cache: func() Cache {
|
||||
cache := NewMemoryCache()
|
||||
err := cache.Put(context.Background(), "hello", "world", time.Second*2)
|
||||
require.NoError(t, err)
|
||||
return cache
|
||||
}(),
|
||||
storeFunc: func(ctx context.Context, key string, val any) error {
|
||||
mockDbStore[key] = val
|
||||
return nil
|
||||
},
|
||||
ctx: context.TODO(),
|
||||
key: "hello",
|
||||
value: "world",
|
||||
},
|
||||
{
|
||||
name: "store key/value timeout",
|
||||
interval: time.Second * 2,
|
||||
sleepSecond: time.Second * 3,
|
||||
cache: func() Cache {
|
||||
cache := NewMemoryCache()
|
||||
err := cache.Put(context.Background(), "hello", "hello", time.Second*2)
|
||||
require.NoError(t, err)
|
||||
return cache
|
||||
}(),
|
||||
storeFunc: func(ctx context.Context, key string, val any) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(3 * time.Second):
|
||||
mockDbStore[key] = val
|
||||
return nil
|
||||
}
|
||||
},
|
||||
ctx: func() context.Context {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
cancels = append(cancels, cancel)
|
||||
return ctx
|
||||
|
||||
}(),
|
||||
key: "hello",
|
||||
value: "hello",
|
||||
},
|
||||
}
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cache := tt.cache
|
||||
c, err := NewWriteDoubleDeleteCache(cache, tt.interval, timeout, tt.storeFunc)
|
||||
if err != nil {
|
||||
assert.EqualError(t, tt.wantErr, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
err = c.Set(tt.ctx, tt.key, tt.value)
|
||||
if err != nil {
|
||||
assert.EqualError(t, tt.wantErr, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
_, err = c.Get(tt.ctx, tt.key)
|
||||
assert.Equal(t, ErrKeyNotExist, err)
|
||||
|
||||
err = cache.Put(tt.ctx, tt.key, tt.value, tt.interval)
|
||||
require.NoError(t, err)
|
||||
|
||||
val, err := c.Get(tt.ctx, tt.key)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tt.value, val)
|
||||
|
||||
time.Sleep(tt.sleepSecond)
|
||||
|
||||
_, err = c.Get(tt.ctx, tt.key)
|
||||
assert.Equal(t, ErrKeyNotExist, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewWriteDoubleDeleteCache(t *testing.T) {
|
||||
underlyingCache := NewMemoryCache()
|
||||
storeFunc := func(ctx context.Context, key string, val any) error { return nil }
|
||||
|
||||
type args struct {
|
||||
cache Cache
|
||||
interval time.Duration
|
||||
fn func(ctx context.Context, key string, val any) error
|
||||
}
|
||||
timeout := time.Second * 3
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantRes *WriteDoubleDeleteCache
|
||||
wantErr error
|
||||
}{
|
||||
{
|
||||
name: "nil cache parameters",
|
||||
args: args{
|
||||
cache: nil,
|
||||
fn: storeFunc,
|
||||
},
|
||||
wantErr: berror.Error(InvalidInitParameters, "cache or storeFunc can not be nil"),
|
||||
},
|
||||
{
|
||||
name: "nil storeFunc parameters",
|
||||
args: args{
|
||||
cache: underlyingCache,
|
||||
fn: nil,
|
||||
},
|
||||
wantErr: berror.Error(InvalidInitParameters, "cache or storeFunc can not be nil"),
|
||||
},
|
||||
{
|
||||
name: "init write-though cache success",
|
||||
args: args{
|
||||
cache: underlyingCache,
|
||||
fn: storeFunc,
|
||||
interval: time.Second,
|
||||
},
|
||||
wantRes: &WriteDoubleDeleteCache{
|
||||
Cache: underlyingCache,
|
||||
storeFunc: storeFunc,
|
||||
interval: time.Second,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, err := NewWriteDoubleDeleteCache(tt.args.cache, tt.args.interval, timeout, tt.args.fn)
|
||||
assert.Equal(t, tt.wantErr, err)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleWriteDoubleDeleteCache() {
|
||||
c := NewMemoryCache()
|
||||
wtc, err := NewWriteDoubleDeleteCache(c, 1*time.Second, 3*time.Second, func(ctx context.Context, key string, val any) error {
|
||||
fmt.Printf("write data to somewhere key %s, val %v \n", key, val)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = wtc.Set(context.Background(),
|
||||
"/biz/user/id=1", "I am user 1")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// Output:
|
||||
// write data to somewhere key /biz/user/id=1, val I am user 1
|
||||
}
|
||||
|
||||
func TestWriteDeleteCache_Set(t *testing.T) {
|
||||
mockDbStore := make(map[string]any)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user