You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
358 lines
12 KiB
358 lines
12 KiB
// Copyright © 2023 OpenIM SDK. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package syncer
|
|
|
|
import (
|
|
"context"
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/openimsdk/openim-sdk-core/v3/internal/util"
|
|
"github.com/openimsdk/openim-sdk-core/v3/pkg/page"
|
|
"github.com/openimsdk/tools/errs"
|
|
"github.com/openimsdk/tools/utils/datautil"
|
|
"reflect"
|
|
|
|
"github.com/openimsdk/tools/log"
|
|
)
|
|
|
|
// New creates a new Syncer instance with the provided synchronization functions.
|
|
// This function requires several callback functions to handle different aspects of data synchronization:
|
|
// - insert: A function to insert new data.
|
|
// - delete: A function to delete existing data.
|
|
// - update: A function to update existing data.
|
|
// - uuid: A function to generate a unique identifier for each data item.
|
|
// - equal: A function to check if two data items are equal.
|
|
// - notice: A function to handle notifications during the sync process.
|
|
// Panics if insert, delete, update, or uuid functions are not provided.
|
|
func New[T, RESP any, V comparable](
|
|
insert func(ctx context.Context, value T) error,
|
|
delete func(ctx context.Context, value T) error,
|
|
update func(ctx context.Context, server T, local T) error,
|
|
uuid func(value T) V,
|
|
equal func(a T, b T) bool,
|
|
notice func(ctx context.Context, state int, server, local T) error,
|
|
) *Syncer[T, RESP, V] {
|
|
if insert == nil || update == nil || delete == nil || uuid == nil {
|
|
panic("invalid params")
|
|
}
|
|
|
|
// Determine the type of T and remove pointer indirection if necessary.
|
|
var t T
|
|
tof := reflect.TypeOf(&t)
|
|
for tof.Kind() == reflect.Ptr {
|
|
tof = tof.Elem()
|
|
}
|
|
|
|
// Return a new Syncer instance with the provided functions and the type as a string.
|
|
return &Syncer[T, RESP, V]{
|
|
insert: insert,
|
|
update: update,
|
|
delete: delete,
|
|
uuid: uuid,
|
|
equal: equal,
|
|
notice: notice,
|
|
ts: tof.String(),
|
|
}
|
|
}
|
|
|
|
// Syncer is a struct that holds functions for synchronizing data.
|
|
// It includes functions for inserting, updating, deleting, and notifying,
|
|
// as well as functions for generating unique IDs and checking equality of data items.
|
|
type Syncer[T, RESP any, V comparable] struct {
|
|
insert func(ctx context.Context, server T) error
|
|
update func(ctx context.Context, server T, local T) error
|
|
delete func(ctx context.Context, local T) error
|
|
batchInsert func(ctx context.Context, servers []T) error
|
|
deleteAll func(ctx context.Context, entityID string) error
|
|
notice func(ctx context.Context, state int, server, local T) error
|
|
equal func(server T, local T) bool
|
|
uuid func(value T) V
|
|
batchPageReq func(entityID string) page.PageReq
|
|
batchPageRespConvertFunc func(resp *RESP) []T
|
|
reqApiRouter string
|
|
ts string // Represents the type of T as a string.
|
|
|
|
}
|
|
|
|
type NoResp struct{}
|
|
|
|
func New2[T, RESP any, V comparable](opts ...Option[T, RESP, V]) *Syncer[T, RESP, V] {
|
|
// Create a new Syncer instance.
|
|
s := &Syncer[T, RESP, V]{}
|
|
|
|
// Apply the options to the Syncer.
|
|
for _, opt := range opts {
|
|
opt(s)
|
|
}
|
|
|
|
// Check required functions.
|
|
if s.insert == nil || s.update == nil || s.delete == nil || s.uuid == nil {
|
|
panic("invalid params")
|
|
}
|
|
|
|
// Determine the type of T and remove pointer indirection if necessary.
|
|
var t T
|
|
tof := reflect.TypeOf(&t)
|
|
for tof.Kind() == reflect.Ptr {
|
|
tof = tof.Elem()
|
|
}
|
|
|
|
// Set the type string.
|
|
s.ts = tof.String()
|
|
|
|
return s
|
|
}
|
|
|
|
// Option is a function that configures a Syncer.
|
|
type Option[T, RESP any, V comparable] func(*Syncer[T, RESP, V])
|
|
|
|
// WithInsert sets the insert function for the Syncer.
|
|
func WithInsert[T, RESP any, V comparable](f func(ctx context.Context, value T) error) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.insert = f
|
|
}
|
|
}
|
|
|
|
// WithUpdate sets the update function for the Syncer.
|
|
func WithUpdate[T, RESP any, V comparable](f func(ctx context.Context, server T, local T) error) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.update = f
|
|
}
|
|
}
|
|
|
|
// WithDelete sets the delete function for the Syncer.
|
|
func WithDelete[T, RESP any, V comparable](f func(ctx context.Context, local T) error) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.delete = f
|
|
}
|
|
}
|
|
|
|
// WithBatchInsert sets the batchInsert function for the Syncer.
|
|
func WithBatchInsert[T, RESP any, V comparable](f func(ctx context.Context, values []T) error) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.batchInsert = f
|
|
}
|
|
}
|
|
|
|
// WithDeleteAll sets the deleteAll function for the Syncer.
|
|
func WithDeleteAll[T, RESP any, V comparable](f func(ctx context.Context, entityID string) error) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.deleteAll = f
|
|
}
|
|
}
|
|
|
|
// WithUUID sets the uuid function for the Syncer.
|
|
func WithUUID[T, RESP any, V comparable](f func(value T) V) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.uuid = f
|
|
}
|
|
}
|
|
|
|
// WithEqual sets the equal function for the Syncer.
|
|
func WithEqual[T, RESP any, V comparable](f func(a T, b T) bool) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.equal = f
|
|
}
|
|
}
|
|
|
|
// WithNotice sets the notice function for the Syncer.
|
|
func WithNotice[T, RESP any, V comparable](f func(ctx context.Context, state int, server, local T) error) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.notice = f
|
|
}
|
|
}
|
|
|
|
// WithBatchPageReq sets the batchPageReq for the Syncer.
|
|
func WithBatchPageReq[T, RESP any, V comparable](f func(entityID string) page.PageReq) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.batchPageReq = f
|
|
}
|
|
}
|
|
|
|
// WithBatchPageRespConvertFunc sets the batchPageRespConvertFunc function for the Syncer.
|
|
func WithBatchPageRespConvertFunc[T, RESP any, V comparable](f func(resp *RESP) []T) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.batchPageRespConvertFunc = f
|
|
}
|
|
}
|
|
|
|
// WithReqApiRouter sets the reqApiRouter for the Syncer.
|
|
func WithReqApiRouter[T, RESP any, V comparable](router string) Option[T, RESP, V] {
|
|
return func(s *Syncer[T, RESP, V]) {
|
|
s.reqApiRouter = router
|
|
}
|
|
}
|
|
|
|
// NewSyncer creates a new Syncer with the provided options.
|
|
func NewSyncer[T, RESP any, V comparable](opts ...Option[T, RESP, V]) *Syncer[T, RESP, V] {
|
|
syncer := &Syncer[T, RESP, V]{}
|
|
for _, opt := range opts {
|
|
opt(syncer)
|
|
}
|
|
return syncer
|
|
}
|
|
|
|
// eq is a helper function to check equality of two data items.
|
|
// It uses the equal function if provided; otherwise, it falls back to the cmp.Equal function.
|
|
func (s *Syncer[T, RESP, V]) eq(server T, local T) bool {
|
|
if s.equal != nil {
|
|
return s.equal(server, local)
|
|
}
|
|
return cmp.Equal(server, local)
|
|
}
|
|
|
|
// onNotice is a helper function to handle notifications.
|
|
// It calls the Syncer's notice function and the provided notice function in sequence if they are not nil.
|
|
func (s *Syncer[T, RESP, V]) onNotice(ctx context.Context, state int, server, local T, fn func(ctx context.Context, state int, server, local T) error) error {
|
|
if s.notice != nil {
|
|
if err := s.notice(ctx, state, server, local); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if fn != nil {
|
|
if err := fn(ctx, state, server, local); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Sync synchronizes server data with local data.
|
|
// Sync synchronizes the data between the server and local storage.
|
|
// It takes a context, two slices of data (serverData and localData),
|
|
// a notice function to handle notifications during the sync process,
|
|
// and a variadic parameter skipDeletion to control deletion behavior.
|
|
func (s *Syncer[T, RESP, V]) Sync(ctx context.Context, serverData []T, localData []T, notice func(ctx context.Context, state int, server, local T) error, skipDeletionAndNotice ...bool) (err error) {
|
|
defer func() {
|
|
// Log the outcome of the synchronization process.
|
|
if err == nil {
|
|
log.ZDebug(ctx, "sync success", "type", s.ts)
|
|
} else {
|
|
log.ZError(ctx, "sync failed", err, "type", s.ts)
|
|
}
|
|
}()
|
|
skipDeletion := false
|
|
if len(skipDeletionAndNotice) > 0 {
|
|
skipDeletion = skipDeletionAndNotice[0]
|
|
}
|
|
|
|
// If both server and local data are empty, log and return.
|
|
if len(serverData) == 0 && len(localData) == 0 {
|
|
log.ZDebug(ctx, "sync both the server and client are empty", "type", s.ts)
|
|
return nil
|
|
}
|
|
|
|
// Convert local data into a map for easier lookup.
|
|
localMap := datautil.SliceToMap(localData, func(item T) V {
|
|
return s.uuid(item)
|
|
})
|
|
|
|
// Iterate through server data to sync with local data.
|
|
for i := range serverData {
|
|
server := serverData[i]
|
|
id := s.uuid(server)
|
|
local, ok := localMap[id]
|
|
|
|
// If the item doesn't exist locally, insert it.
|
|
if !ok {
|
|
log.ZDebug(ctx, "sync insert", "type", s.ts, "server", server)
|
|
if err := s.insert(ctx, server); err != nil {
|
|
log.ZError(ctx, "sync insert failed", err, "type", s.ts, "server", server, "local", local)
|
|
return err
|
|
}
|
|
if err := s.onNotice(ctx, Insert, server, local, notice); err != nil {
|
|
log.ZError(ctx, "sync notice insert failed", err, "type", s.ts, "server", server, "local", local)
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Remove the item from the local map as it's found.
|
|
delete(localMap, id)
|
|
|
|
// If the local and server items are equal, notify and continue.
|
|
if s.eq(server, local) {
|
|
if err := s.onNotice(ctx, Unchanged, local, server, notice); err != nil {
|
|
log.ZError(ctx, "sync notice unchanged failed", err, "type", s.ts, "server", server, "local", local)
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
|
|
log.ZDebug(ctx, "sync update", "type", s.ts, "server", server, "local", local)
|
|
// Update the local item with server data.
|
|
if err := s.update(ctx, server, local); err != nil {
|
|
log.ZError(ctx, "sync update failed", err, "type", s.ts, "server", server, "local", local)
|
|
return err
|
|
}
|
|
if err := s.onNotice(ctx, Update, server, local, notice); err != nil {
|
|
log.ZError(ctx, "sync notice update failed", err, "type", s.ts, "server", server, "local", local)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Check the skipDeletion flag; if set, skip deletion.
|
|
if skipDeletion {
|
|
return nil
|
|
}
|
|
log.ZDebug(ctx, "sync delete", "type", s.ts, "localMap", localMap)
|
|
// Delete any local items not present in server data.
|
|
for id := range localMap {
|
|
local := localMap[id]
|
|
if err := s.delete(ctx, local); err != nil {
|
|
log.ZError(ctx, "sync delete failed", err, "type", s.ts, "local", local)
|
|
return err
|
|
}
|
|
var server T
|
|
if err := s.onNotice(ctx, Delete, server, local, notice); err != nil {
|
|
log.ZError(ctx, "sync notice delete failed", err, "type", s.ts, "local", local)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Syncer[T, RESP, V]) FullSync(ctx context.Context, entityID string) (err error) {
|
|
defer func() {
|
|
// Log the outcome of the synchronization process.
|
|
if err == nil {
|
|
log.ZDebug(ctx, "full sync success", "type", s.ts)
|
|
} else {
|
|
log.ZError(ctx, "full sync failed", err, "type", s.ts)
|
|
}
|
|
}()
|
|
|
|
//// If server data is empty, log and return
|
|
//if len(serverData) == 0 {
|
|
// log.ZDebug(ctx, "full sync server data is empty", "type", s.ts)
|
|
// return nil
|
|
//}
|
|
|
|
// Clear local table data
|
|
if err = s.deleteAll(ctx, entityID); err != nil {
|
|
return errs.New("full sync delete all failed", "err", err.Error(), "type", s.ts)
|
|
}
|
|
|
|
// Get batch req
|
|
batchReq := s.batchPageReq(entityID)
|
|
|
|
// Batch page pull data and insert server data
|
|
if err = util.FetchAndInsertPagedData(ctx, s.reqApiRouter, batchReq, s.batchPageRespConvertFunc,
|
|
s.batchInsert, s.insert, 100); err != nil {
|
|
return errs.New("full sync batch insert failed", "err", err.Error(), "type", s.ts)
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|