From f0bd767d81ab8cadfb4f6e497044e3da9c28e598 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 28 Jun 2024 11:41:27 +0800 Subject: [PATCH] fix: reinstall app sync data split. --- .../conversation_notification.go | 79 +++++++++++++++---- .../conversation_msg/message_controller.go | 2 +- .../internal/conversation_msg/sync.go | 35 +++++++- go/chao-sdk-core/internal/friend/sync.go | 45 +++++++++++ go/chao-sdk-core/internal/group/sync.go | 28 +++++++ .../internal/interaction/msg_sync.go | 58 ++++++++++---- go/chao-sdk-core/internal/user/sync.go | 35 +++++++- go/chao-sdk-core/open_im_sdk/em.go | 12 ++- .../open_im_sdk_callback/callback_client.go | 8 +- .../pkg/common/trigger_channel.go | 20 ++--- go/chao-sdk-core/pkg/constant/constant.go | 2 + .../pkg/db/db_interface/databse.go | 1 + go/chao-sdk-core/pkg/db/notification_model.go | 7 ++ go/chao-sdk-core/test/t_conversation_msg.go | 6 +- go/export.go | 24 ++++-- 15 files changed, 292 insertions(+), 70 deletions(-) diff --git a/go/chao-sdk-core/internal/conversation_msg/conversation_notification.go b/go/chao-sdk-core/internal/conversation_msg/conversation_notification.go index 84305e6..68f09c8 100644 --- a/go/chao-sdk-core/internal/conversation_msg/conversation_notification.go +++ b/go/chao-sdk-core/internal/conversation_msg/conversation_notification.go @@ -38,7 +38,6 @@ func (c *Conversation) Work(c2v common.Cmd2Value) { switch c2v.Cmd { case constant.CmdNewMsgCome: c.doMsgNew(c2v) - case constant.CmdSuperGroupMsgCome: case constant.CmdUpdateConversation: c.doUpdateConversation(c2v) case constant.CmdUpdateMessage: @@ -54,26 +53,54 @@ func (c *Conversation) doNotificationNew(c2v common.Cmd2Value) { allMsg := c2v.Value.(sdk_struct.CmdNewMsgComeToConversation).Msgs syncFlag := c2v.Value.(sdk_struct.CmdNewMsgComeToConversation).SyncFlag switch syncFlag { - case constant.MsgSyncBegin: + case constant.AppDataSyncStart: + log.ZDebug(ctx, "AppDataSyncStart") c.startTime = time.Now() - c.ConversationListener().OnSyncServerStart() - if err := c.SyncAllConversationHashReadSeqs(ctx); err != nil { - log.ZError(ctx, "SyncConversationHashReadSeqs err", err) + c.ConversationListener().OnSyncServerStart(true) + syncFunctions := []func(c context.Context) error{ + c.SyncAllConversationHashReadSeqs, + c.user.SyncLoginUserInfoWithoutNotice, + c.friend.SyncAllBlackListWithoutNotice, + c.friend.SyncAllFriendApplicationWithoutNotice, + c.friend.SyncAllSelfFriendApplicationWithoutNotice, + c.group.SyncAllAdminGroupApplicationWithoutNotice, + c.group.SyncAllSelfGroupApplicationWithoutNotice, + c.user.SyncAllCommandWithoutNotice, + c.group.SyncAllJoinedGroupsAndMembers, + c.friend.IncrSyncFriends, + c.SyncAllConversationsWithoutNotice, } + totalFunctions := len(syncFunctions) + for i, syncFunc := range syncFunctions { + funcName := runtime.FuncForPC(reflect.ValueOf(syncFunc).Pointer()).Name() + startTime := time.Now() + err := syncFunc(ctx) + duration := time.Since(startTime) + if err != nil { + log.ZWarn(ctx, fmt.Sprintf("%s sync err", funcName), err, "duration", duration.Seconds()) + } else { + log.ZDebug(ctx, fmt.Sprintf("%s completed successfully", funcName), "duration", duration.Seconds()) + } + progress := int(float64(i+1) / float64(totalFunctions) * 100) + if progress == 0 { + progress = 1 + } + c.ConversationListener().OnSyncServerProgress(progress) + } + case constant.AppDataSyncFinish: + log.ZDebug(ctx, "AppDataSyncFinish", "time", time.Since(c.startTime).Milliseconds()) + c.ConversationListener().OnSyncServerFailed(true) + case constant.MsgSyncBegin: + log.ZDebug(ctx, "MsgSyncBegin") + c.startTime = time.Now() + c.ConversationListener().OnSyncServerStart(false) //clear SubscriptionStatusMap c.user.OnlineStatusCache.DeleteAll() - for _, syncFunc := range []func(c context.Context) error{ - c.user.SyncLoginUserInfo, - c.friend.SyncAllBlackList, c.friend.SyncAllFriendApplication, c.friend.SyncAllSelfFriendApplication, - c.group.SyncAllAdminGroupApplication, c.group.SyncAllSelfGroupApplication, c.user.SyncAllCommand, - } { - go func(syncFunc func(c context.Context) error) { - _ = syncFunc(ctx) - }(syncFunc) - } syncFunctions := []func(c context.Context) error{ - c.group.SyncAllJoinedGroupsAndMembers, c.friend.IncrSyncFriends, c.SyncAllConversations, + c.SyncAllConversationHashReadSeqs, + c.group.SyncAllJoinedGroupsAndMembers, + c.friend.IncrSyncFriends, } for _, syncFunc := range syncFunctions { @@ -87,11 +114,29 @@ func (c *Conversation) doNotificationNew(c2v common.Cmd2Value) { log.ZDebug(ctx, fmt.Sprintf("%s completed successfully", funcName), "duration", duration.Seconds()) } } + for _, syncFunc := range []func(c context.Context) error{ + c.user.SyncLoginUserInfo, + c.friend.SyncAllBlackList, c.friend.SyncAllFriendApplication, c.friend.SyncAllSelfFriendApplication, + c.group.SyncAllAdminGroupApplication, c.group.SyncAllSelfGroupApplication, c.user.SyncAllCommand, c.SyncAllConversations, + } { + go func(syncFunc func(c context.Context) error) { + funcName := runtime.FuncForPC(reflect.ValueOf(syncFunc).Pointer()).Name() + startTime := time.Now() + err := syncFunc(ctx) + duration := time.Since(startTime) + if err != nil { + log.ZWarn(ctx, fmt.Sprintf("%s sync err", funcName), err, "duration", duration.Seconds()) + } else { + log.ZDebug(ctx, fmt.Sprintf("%s completed successfully", funcName), "duration", duration.Seconds()) + } + }(syncFunc) + } + case constant.MsgSyncFailed: - c.ConversationListener().OnSyncServerFailed() + c.ConversationListener().OnSyncServerFailed(false) case constant.MsgSyncEnd: log.ZDebug(ctx, "MsgSyncEnd", "time", time.Since(c.startTime).Milliseconds()) - defer c.ConversationListener().OnSyncServerFinish() + c.ConversationListener().OnSyncServerFinish(false) } for conversationID, msgs := range allMsg { diff --git a/go/chao-sdk-core/internal/conversation_msg/message_controller.go b/go/chao-sdk-core/internal/conversation_msg/message_controller.go index 59678a3..3b413e9 100644 --- a/go/chao-sdk-core/internal/conversation_msg/message_controller.go +++ b/go/chao-sdk-core/internal/conversation_msg/message_controller.go @@ -50,7 +50,7 @@ func (m *MessageController) BatchUpdateMessageList(ctx context.Context, updateMs latestMsg := &sdk_struct.MsgStruct{} if err := json.Unmarshal([]byte(conversation.LatestMsg), latestMsg); err != nil { log.ZError(ctx, "Unmarshal err", err, "conversationID", - conversationID, "latestMsg", conversation.LatestMsg) + conversationID, "latestMsg", conversation.LatestMsg, "messages", messages) continue } for _, v := range messages { diff --git a/go/chao-sdk-core/internal/conversation_msg/sync.go b/go/chao-sdk-core/internal/conversation_msg/sync.go index ec87f67..bfb6261 100644 --- a/go/chao-sdk-core/internal/conversation_msg/sync.go +++ b/go/chao-sdk-core/internal/conversation_msg/sync.go @@ -54,6 +54,16 @@ func (c *Conversation) SyncConversations(ctx context.Context, conversationIDs [] } func (c *Conversation) SyncAllConversations(ctx context.Context) error { + ccTime := time.Now() + conversationsOnServer, err := c.getServerConversationList(ctx) + if err != nil { + return err + } + log.ZDebug(ctx, "get server cost time", "cost time", time.Since(ccTime), "conversation on server", conversationsOnServer) + return c.SyncConversationsAndTriggerCallback(ctx, conversationsOnServer, false) +} + +func (c *Conversation) SyncAllConversationsWithoutNotice(ctx context.Context) error { ccTime := time.Now() conversationsOnServer, err := c.getServerConversationList(ctx) if err != nil { @@ -64,25 +74,34 @@ func (c *Conversation) SyncAllConversations(ctx context.Context) error { } func (c *Conversation) SyncAllConversationHashReadSeqs(ctx context.Context) error { + startTime := time.Now() log.ZDebug(ctx, "start SyncConversationHashReadSeqs") + seqs, err := c.getServerHasReadAndMaxSeqs(ctx) if err != nil { return err } + log.ZDebug(ctx, "getServerHasReadAndMaxSeqs completed", "duration", time.Since(startTime).Seconds()) + if len(seqs) == 0 { return nil } var conversationChangedIDs []string var conversationIDsNeedSync []string + stepStartTime := time.Now() conversationsOnLocal, err := c.db.GetAllConversations(ctx) if err != nil { log.ZWarn(ctx, "get all conversations err", err) return err } + log.ZDebug(ctx, "GetAllConversations completed", "duration", time.Since(stepStartTime).Seconds()) + conversationsOnLocalMap := datautil.SliceToMap(conversationsOnLocal, func(e *model_struct.LocalConversation) string { return e.ConversationID }) + + stepStartTime = time.Now() for conversationID, v := range seqs { var unreadCount int32 c.maxSeqRecorder.Set(conversationID, v.MaxSeq) @@ -104,18 +123,24 @@ func (c *Conversation) SyncAllConversationHashReadSeqs(ctx context.Context) erro } else { conversationIDsNeedSync = append(conversationIDsNeedSync, conversationID) } - } + log.ZDebug(ctx, "Process seqs completed", "duration", time.Since(stepStartTime).Seconds()) + if len(conversationIDsNeedSync) > 0 { + stepStartTime = time.Now() conversationsOnServer, err := c.getServerConversationsByIDs(ctx, conversationIDsNeedSync) if err != nil { log.ZWarn(ctx, "getServerConversationsByIDs err", err, "conversationIDs", conversationIDsNeedSync) return err } + log.ZDebug(ctx, "getServerConversationsByIDs completed", "duration", time.Since(stepStartTime).Seconds()) + + stepStartTime = time.Now() if err := c.batchAddFaceURLAndName(ctx, conversationsOnServer...); err != nil { log.ZWarn(ctx, "batchAddFaceURLAndName err", err, "conversationsOnServer", conversationsOnServer) return err } + log.ZDebug(ctx, "batchAddFaceURLAndName completed", "duration", time.Since(stepStartTime).Seconds()) for _, conversation := range conversationsOnServer { var unreadCount int32 @@ -132,17 +157,23 @@ func (c *Conversation) SyncAllConversationHashReadSeqs(ctx context.Context) erro conversation.UnreadCount = unreadCount conversation.HasReadSeq = v.HasReadSeq } + + stepStartTime = time.Now() err = c.db.BatchInsertConversationList(ctx, conversationsOnServer) if err != nil { log.ZWarn(ctx, "BatchInsertConversationList err", err, "conversationsOnServer", conversationsOnServer) } - + log.ZDebug(ctx, "BatchInsertConversationList completed", "duration", time.Since(stepStartTime).Seconds()) } log.ZDebug(ctx, "update conversations", "conversations", conversationChangedIDs) if len(conversationChangedIDs) > 0 { + stepStartTime = time.Now() common.TriggerCmdUpdateConversation(ctx, common.UpdateConNode{Action: constant.ConChange, Args: conversationChangedIDs}, c.GetCh()) common.TriggerCmdUpdateConversation(ctx, common.UpdateConNode{Action: constant.TotalUnreadMessageChanged}, c.GetCh()) + log.ZDebug(ctx, "TriggerCmdUpdateConversation completed", "duration", time.Since(stepStartTime).Seconds()) } + + log.ZDebug(ctx, "SyncAllConversationHashReadSeqs completed", "totalDuration", time.Since(startTime).Seconds()) return nil } diff --git a/go/chao-sdk-core/internal/friend/sync.go b/go/chao-sdk-core/internal/friend/sync.go index 21caef6..4fd2979 100644 --- a/go/chao-sdk-core/internal/friend/sync.go +++ b/go/chao-sdk-core/internal/friend/sync.go @@ -61,6 +61,22 @@ func (f *Friend) SyncAllSelfFriendApplication(ctx context.Context) error { return f.requestSendSyncer.Sync(ctx, datautil.Batch(ServerFriendRequestToLocalFriendRequest, requests), localData, nil) } +func (f *Friend) SyncAllSelfFriendApplicationWithoutNotice(ctx context.Context) error { + req := &friend.GetPaginationFriendsApplyFromReq{UserID: f.loginUserID, Pagination: &sdkws.RequestPagination{}} + fn := func(resp *friend.GetPaginationFriendsApplyFromResp) []*sdkws.FriendRequest { + return resp.FriendRequests + } + requests, err := util.GetPageAll(ctx, constant.GetSelfFriendApplicationListRouter, req, fn) + if err != nil { + return err + } + localData, err := f.db.GetSendFriendApplication(ctx) + if err != nil { + return err + } + return f.requestSendSyncer.Sync(ctx, datautil.Batch(ServerFriendRequestToLocalFriendRequest, requests), localData, nil, false, true) +} + // recv func (f *Friend) SyncAllFriendApplication(ctx context.Context) error { req := &friend.GetPaginationFriendsApplyToReq{UserID: f.loginUserID, Pagination: &sdkws.RequestPagination{}} @@ -75,6 +91,19 @@ func (f *Friend) SyncAllFriendApplication(ctx context.Context) error { } return f.requestRecvSyncer.Sync(ctx, datautil.Batch(ServerFriendRequestToLocalFriendRequest, requests), localData, nil) } +func (f *Friend) SyncAllFriendApplicationWithoutNotice(ctx context.Context) error { + req := &friend.GetPaginationFriendsApplyToReq{UserID: f.loginUserID, Pagination: &sdkws.RequestPagination{}} + fn := func(resp *friend.GetPaginationFriendsApplyToResp) []*sdkws.FriendRequest { return resp.FriendRequests } + requests, err := util.GetPageAll(ctx, constant.GetFriendApplicationListRouter, req, fn) + if err != nil { + return err + } + localData, err := f.db.GetRecvFriendApplication(ctx) + if err != nil { + return err + } + return f.requestRecvSyncer.Sync(ctx, datautil.Batch(ServerFriendRequestToLocalFriendRequest, requests), localData, nil, false, true) +} func (f *Friend) SyncAllFriendList(ctx context.Context) error { t := time.Now() @@ -171,6 +200,22 @@ func (f *Friend) SyncAllBlackList(ctx context.Context) error { return f.blockSyncer.Sync(ctx, datautil.Batch(ServerBlackToLocalBlack, serverData), localData, nil) } +func (f *Friend) SyncAllBlackListWithoutNotice(ctx context.Context) error { + req := &friend.GetPaginationBlacksReq{UserID: f.loginUserID, Pagination: &sdkws.RequestPagination{}} + fn := func(resp *friend.GetPaginationBlacksResp) []*sdkws.BlackInfo { return resp.Blacks } + serverData, err := util.GetPageAll(ctx, constant.GetBlackListRouter, req, fn) + if err != nil { + return err + } + log.ZDebug(ctx, "black from server", "data", serverData) + localData, err := f.db.GetBlackListDB(ctx) + if err != nil { + return err + } + log.ZDebug(ctx, "black from local", "data", localData) + return f.blockSyncer.Sync(ctx, datautil.Batch(ServerBlackToLocalBlack, serverData), localData, nil, false, true) +} + func (f *Friend) GetDesignatedFriends(ctx context.Context, friendIDs []string) ([]*sdkws.FriendInfo, error) { resp := &friend.GetDesignatedFriendsResp{} if err := util.ApiPost(ctx, constant.GetDesignatedFriendsRouter, &friend.GetDesignatedFriendsReq{OwnerUserID: f.loginUserID, FriendUserIDs: friendIDs}, &resp); err != nil { diff --git a/go/chao-sdk-core/internal/group/sync.go b/go/chao-sdk-core/internal/group/sync.go index 16b338c..5ca7513 100644 --- a/go/chao-sdk-core/internal/group/sync.go +++ b/go/chao-sdk-core/internal/group/sync.go @@ -246,6 +246,22 @@ func (g *Group) SyncAllSelfGroupApplication(ctx context.Context) error { return nil } +func (g *Group) SyncAllSelfGroupApplicationWithoutNotice(ctx context.Context) error { + list, err := g.GetServerSelfGroupApplication(ctx) + if err != nil { + return err + } + localData, err := g.db.GetSendGroupApplication(ctx) + if err != nil { + return err + } + if err := g.groupRequestSyncer.Sync(ctx, datautil.Batch(ServerGroupRequestToLocalGroupRequest, list), localData, nil, false, true); err != nil { + return err + } + // todo + return nil +} + func (g *Group) SyncSelfGroupApplications(ctx context.Context, groupIDs ...string) error { return g.SyncAllSelfGroupApplication(ctx) } @@ -262,6 +278,18 @@ func (g *Group) SyncAllAdminGroupApplication(ctx context.Context) error { return g.groupAdminRequestSyncer.Sync(ctx, datautil.Batch(ServerGroupRequestToLocalAdminGroupRequest, requests), localData, nil) } +func (g *Group) SyncAllAdminGroupApplicationWithoutNotice(ctx context.Context) error { + requests, err := g.GetServerAdminGroupApplicationList(ctx) + if err != nil { + return err + } + localData, err := g.db.GetAdminGroupApplication(ctx) + if err != nil { + return err + } + return g.groupAdminRequestSyncer.Sync(ctx, datautil.Batch(ServerGroupRequestToLocalAdminGroupRequest, requests), localData, nil, false, true) +} + func (g *Group) SyncAdminGroupApplications(ctx context.Context, groupIDs ...string) error { return g.SyncAllAdminGroupApplication(ctx) } diff --git a/go/chao-sdk-core/internal/interaction/msg_sync.go b/go/chao-sdk-core/internal/interaction/msg_sync.go index 43475b7..58f9a50 100644 --- a/go/chao-sdk-core/internal/interaction/msg_sync.go +++ b/go/chao-sdk-core/internal/interaction/msg_sync.go @@ -16,6 +16,7 @@ package interaction import ( "context" + "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "strings" "github.com/openimsdk/openim-sdk-core/v3/pkg/common" @@ -148,15 +149,31 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma messagesSeqMap[conversationID] = seq } } + + var notificationSeqs []*model_struct.NotificationSeqs + for conversationID, seq := range notificationsSeqMap { - err := m.db.SetNotificationSeq(ctx, conversationID, seq) - if err != nil { - log.ZWarn(ctx, "SetNotificationSeq err", err, "conversationID", conversationID, "seq", seq) - continue - } else { - m.syncedMaxSeqs[conversationID] = seq - } + notificationSeqs = append(notificationSeqs, &model_struct.NotificationSeqs{ + ConversationID: conversationID, + Seq: seq, + }) + m.syncedMaxSeqs[conversationID] = seq } + + err := m.db.BatchInsertNotificationSeq(ctx, notificationSeqs) + if err != nil { + log.ZWarn(ctx, "BatchInsertNotificationSeq err", err) + } + + //for conversationID, seq := range notificationsSeqMap { + // err := m.db.SetNotificationSeq(ctx, conversationID, seq) + // if err != nil { + // log.ZWarn(ctx, "SetNotificationSeq err", err, "conversationID", conversationID, "seq", seq) + // continue + // } else { + // m.syncedMaxSeqs[conversationID] = seq + // } + //} for conversationID, maxSeq := range messagesSeqMap { if syncedMaxSeq, ok := m.syncedMaxSeqs[conversationID]; ok { if maxSeq > syncedMaxSeq { @@ -217,7 +234,12 @@ func (m *MsgSyncer) pushTriggerAndSync(ctx context.Context, pullMsgs map[string] // Called after successful reconnection to synchronize the latest message func (m *MsgSyncer) doConnected(ctx context.Context) { - common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncBegin}, m.conversationCh) + reinstalled := m.reinstalled + if reinstalled { + common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.AppDataSyncStart}, m.conversationCh) + } else { + common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncBegin}, m.conversationCh) + } var resp sdkws.GetMaxSeqResp if err := m.longConnMgr.SendReqWaitResp(m.ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil { log.ZError(m.ctx, "get max seq error", err) @@ -227,7 +249,11 @@ func (m *MsgSyncer) doConnected(ctx context.Context) { log.ZDebug(m.ctx, "get max seq success", "resp", resp) } m.compareSeqsAndBatchSync(ctx, resp.MaxSeqs, connectPullNums) - common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncEnd}, m.conversationCh) + if reinstalled { + common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.AppDataSyncFinish}, m.conversationCh) + } else { + common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncEnd}, m.conversationCh) + } } func IsNotification(conversationID string) bool { @@ -363,24 +389,24 @@ func (m *MsgSyncer) syncMsgBySeqs(ctx context.Context, conversationID string, se // triggers a conversation with a new message. func (m *MsgSyncer) triggerConversation(ctx context.Context, msgs map[string]*sdkws.PullMsgs) error { - if len(msgs) >= 0 { + if len(msgs) > 0 { err := common.TriggerCmdNewMsgCome(ctx, sdk_struct.CmdNewMsgComeToConversation{Msgs: msgs}, m.conversationCh) if err != nil { log.ZError(ctx, "triggerCmdNewMsgCome err", err, "msgs", msgs) } log.ZDebug(ctx, "triggerConversation", "msgs", msgs) return err + } else { + log.ZDebug(ctx, "triggerConversation is nil", "msgs", msgs) } return nil } func (m *MsgSyncer) triggerNotification(ctx context.Context, msgs map[string]*sdkws.PullMsgs) error { - if len(msgs) >= 0 { - err := common.TriggerCmdNotification(ctx, sdk_struct.CmdNewMsgComeToConversation{Msgs: msgs}, m.conversationCh) - if err != nil { - log.ZError(ctx, "triggerCmdNewMsgCome err", err, "msgs", msgs) - } - return err + if len(msgs) > 0 { + common.TriggerCmdNotification(ctx, sdk_struct.CmdNewMsgComeToConversation{Msgs: msgs}, m.conversationCh) + } else { + log.ZDebug(ctx, "triggerNotification is nil", "msgs", msgs) } return nil diff --git a/go/chao-sdk-core/internal/user/sync.go b/go/chao-sdk-core/internal/user/sync.go index e3fd118..90b6f40 100644 --- a/go/chao-sdk-core/internal/user/sync.go +++ b/go/chao-sdk-core/internal/user/sync.go @@ -22,10 +22,9 @@ import ( "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" userPb "github.com/openimsdk/protocol/user" - "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" "gorm.io/gorm" ) @@ -45,6 +44,22 @@ func (u *User) SyncLoginUserInfo(ctx context.Context) error { log.ZDebug(ctx, "SyncLoginUserInfo", "remoteUser", remoteUser, "localUser", localUser) return u.userSyncer.Sync(ctx, []*model_struct.LocalUser{remoteUser}, localUsers, nil) } +func (u *User) SyncLoginUserInfoWithoutNotice(ctx context.Context) error { + remoteUser, err := u.GetSingleUserFromSvr(ctx, u.loginUserID) + if err != nil { + return err + } + localUser, err := u.GetLoginUser(ctx, u.loginUserID) + if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound { + log.ZError(ctx, "SyncLoginUserInfo", err) + } + var localUsers []*model_struct.LocalUser + if err == nil { + localUsers = []*model_struct.LocalUser{localUser} + } + log.ZDebug(ctx, "SyncLoginUserInfo", "remoteUser", remoteUser, "localUser", localUser) + return u.userSyncer.Sync(ctx, []*model_struct.LocalUser{remoteUser}, localUsers, nil, false, true) +} func (u *User) SyncUserStatus(ctx context.Context, fromUserID string, status int32, platformID int32) { userOnlineStatus := userPb.OnlineStatus{ @@ -95,3 +110,19 @@ func (u *User) SyncAllCommand(ctx context.Context) error { log.ZDebug(ctx, "sync command", "data from server", serverData, "data from local", localData) return u.commandSyncer.Sync(ctx, datautil.Batch(ServerCommandToLocalCommand, serverData.CommandResp), localData, nil) } + +func (u *User) SyncAllCommandWithoutNotice(ctx context.Context) error { + var serverData CommandInfoResponse + err := util.ApiPost(ctx, constant.ProcessUserCommandGetAll, userPb.ProcessUserCommandGetAllReq{ + UserID: u.loginUserID, + }, &serverData) + if err != nil { + return err + } + localData, err := u.DataBase.ProcessUserCommandGetAll(ctx) + if err != nil { + return err + } + log.ZDebug(ctx, "sync command", "data from server", serverData, "data from local", localData) + return u.commandSyncer.Sync(ctx, datautil.Batch(ServerCommandToLocalCommand, serverData.CommandResp), localData, nil, false, true) +} diff --git a/go/chao-sdk-core/open_im_sdk/em.go b/go/chao-sdk-core/open_im_sdk/em.go index 9ef80a8..e7f0e7f 100644 --- a/go/chao-sdk-core/open_im_sdk/em.go +++ b/go/chao-sdk-core/open_im_sdk/em.go @@ -142,17 +142,21 @@ func newEmptyConversationListener(ctx context.Context) open_im_sdk_callback.OnCo return &emptyConversationListener{ctx: ctx} } -func (e *emptyConversationListener) OnSyncServerStart() { - +func (e *emptyConversationListener) OnSyncServerStart(reinstalled bool) { log.ZWarn(e.ctx, "ConversationListener is not implemented", nil) } -func (e *emptyConversationListener) OnSyncServerFinish() { +func (e *emptyConversationListener) OnSyncServerProgress(progress int) { + log.ZWarn(e.ctx, "ConversationListener is not implemented", nil, + "progress", progress) +} + +func (e *emptyConversationListener) OnSyncServerFinish(reinstalled bool) { log.ZWarn(e.ctx, "ConversationListener is not implemented", nil) } -func (e *emptyConversationListener) OnSyncServerFailed() { +func (e *emptyConversationListener) OnSyncServerFailed(reinstalled bool) { log.ZWarn(e.ctx, "ConversationListener is not implemented", nil) } diff --git a/go/chao-sdk-core/open_im_sdk_callback/callback_client.go b/go/chao-sdk-core/open_im_sdk_callback/callback_client.go index e5643cd..0a003ce 100644 --- a/go/chao-sdk-core/open_im_sdk_callback/callback_client.go +++ b/go/chao-sdk-core/open_im_sdk_callback/callback_client.go @@ -57,10 +57,10 @@ type OnFriendshipListener interface { OnBlackDeleted(blackInfo string) } type OnConversationListener interface { - OnSyncServerStart() - OnSyncServerFinish() - //OnSyncServerProgress(progress int) - OnSyncServerFailed() + OnSyncServerStart(reinstalled bool) + OnSyncServerFinish(reinstalled bool) + OnSyncServerProgress(progress int) + OnSyncServerFailed(reinstalled bool) OnNewConversation(conversationList string) OnConversationChanged(conversationList string) OnTotalUnreadMessageCountChanged(totalUnreadCount int32) diff --git a/go/chao-sdk-core/pkg/common/trigger_channel.go b/go/chao-sdk-core/pkg/common/trigger_channel.go index cb8d1f9..6ca38db 100644 --- a/go/chao-sdk-core/pkg/common/trigger_channel.go +++ b/go/chao-sdk-core/pkg/common/trigger_channel.go @@ -44,22 +44,12 @@ func TriggerCmdNewMsgCome(ctx context.Context, msg sdk_struct.CmdNewMsgComeToCon return sendCmd(conversationCh, c2v, 100) } -func TriggerCmdSuperGroupMsgCome(msg sdk_struct.CmdNewMsgComeToConversation, conversationCh chan Cmd2Value) error { - if conversationCh == nil { - return utils.Wrap(errors.New("ch == nil"), "") - } - - c2v := Cmd2Value{Cmd: constant.CmdSuperGroupMsgCome, Value: msg} - return sendCmd(conversationCh, c2v, 100) -} - -func TriggerCmdNotification(ctx context.Context, msg sdk_struct.CmdNewMsgComeToConversation, conversationCh chan Cmd2Value) error { - if conversationCh == nil { - return utils.Wrap(errors.New("ch == nil"), "") - } - +func TriggerCmdNotification(ctx context.Context, msg sdk_struct.CmdNewMsgComeToConversation, conversationCh chan Cmd2Value) { c2v := Cmd2Value{Cmd: constant.CmdNotification, Value: msg, Ctx: ctx} - return sendCmd(conversationCh, c2v, 100) + err := sendCmd(conversationCh, c2v, 100) + if err != nil { + log.ZWarn(ctx, "TriggerCmdNotification error", err, "msg", msg) + } } func TriggerCmdWakeUp(ch chan Cmd2Value) error { diff --git a/go/chao-sdk-core/pkg/constant/constant.go b/go/chao-sdk-core/pkg/constant/constant.go index e46f480..aaa9068 100644 --- a/go/chao-sdk-core/pkg/constant/constant.go +++ b/go/chao-sdk-core/pkg/constant/constant.go @@ -403,6 +403,8 @@ const ( MsgSyncProcessing = 1002 // MsgSyncEnd = 1003 // MsgSyncFailed = 1004 + AppDataSyncStart = 1005 + AppDataSyncFinish = 1006 ) const ( diff --git a/go/chao-sdk-core/pkg/db/db_interface/databse.go b/go/chao-sdk-core/pkg/db/db_interface/databse.go index c2b9dd2..3c92407 100644 --- a/go/chao-sdk-core/pkg/db/db_interface/databse.go +++ b/go/chao-sdk-core/pkg/db/db_interface/databse.go @@ -173,6 +173,7 @@ type MessageModel interface { DeleteConversationMsgs(ctx context.Context, conversationID string, msgIDs []string) error // DeleteConversationMsgsBySeqs(ctx context.Context, conversationID string, seqs []int64) error SetNotificationSeq(ctx context.Context, conversationID string, seq int64) error + BatchInsertNotificationSeq(ctx context.Context, notificationSeqs []*model_struct.NotificationSeqs) error GetNotificationAllSeqs(ctx context.Context) ([]*model_struct.NotificationSeqs, error) } diff --git a/go/chao-sdk-core/pkg/db/notification_model.go b/go/chao-sdk-core/pkg/db/notification_model.go index 70fc138..89a28bd 100644 --- a/go/chao-sdk-core/pkg/db/notification_model.go +++ b/go/chao-sdk-core/pkg/db/notification_model.go @@ -21,6 +21,7 @@ import ( "context" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" + "github.com/openimsdk/tools/errs" ) func (d *DataBase) SetNotificationSeq(ctx context.Context, conversationID string, seq int64) error { @@ -36,6 +37,12 @@ func (d *DataBase) SetNotificationSeq(ctx context.Context, conversationID string return nil } +func (d *DataBase) BatchInsertNotificationSeq(ctx context.Context, notificationSeqs []*model_struct.NotificationSeqs) error { + d.mRWMutex.Lock() + defer d.mRWMutex.Unlock() + return errs.WrapMsg(d.conn.WithContext(ctx).Create(notificationSeqs).Error, "BatchInsertNotificationSeq failed") +} + func (d *DataBase) GetNotificationAllSeqs(ctx context.Context) ([]*model_struct.NotificationSeqs, error) { d.mRWMutex.Lock() defer d.mRWMutex.Unlock() diff --git a/go/chao-sdk-core/test/t_conversation_msg.go b/go/chao-sdk-core/test/t_conversation_msg.go index c0d1373..cfcb56c 100644 --- a/go/chao-sdk-core/test/t_conversation_msg.go +++ b/go/chao-sdk-core/test/t_conversation_msg.go @@ -653,16 +653,16 @@ func (c *conversationCallBack) OnSyncServerProgress(progress int) { log.ZInfo(ctx, utils.GetSelfFuncName(), "progress", progress) } -func (c *conversationCallBack) OnSyncServerStart() { +func (c *conversationCallBack) OnSyncServerStart(reinstalled bool) { } -func (c *conversationCallBack) OnSyncServerFinish() { +func (c *conversationCallBack) OnSyncServerFinish(reinstalled bool) { c.SyncFlag = 1 log.ZInfo(ctx, utils.GetSelfFuncName()) } -func (c *conversationCallBack) OnSyncServerFailed() { +func (c *conversationCallBack) OnSyncServerFailed(reinstalled bool) { log.ZInfo(ctx, utils.GetSelfFuncName()) } diff --git a/go/export.go b/go/export.go index d922b70..58b3ca0 100644 --- a/go/export.go +++ b/go/export.go @@ -77,16 +77,28 @@ func NewConversationCallback(cCallback C.CB_I_S) *ConversationCallback { return &ConversationCallback{cCallback: cCallback} } -func (c ConversationCallback) OnSyncServerStart() { - C.Call_CB_I_S(c.cCallback, SYNC_SERVER_START, NO_DATA) +func (c ConversationCallback) OnSyncServerStart(reinstalled bool) { + m := make(map[string]any) + m["reinstalled"] = reinstalled + C.Call_CB_I_S(c.cCallback, SYNC_SERVER_START, C.CString(StructToJsonString(m))) +} + +func (c ConversationCallback) OnSyncServerProgress(progress int) { + m := make(map[string]any) + m["progress"] = progress + C.Call_CB_I_S(c.cCallback, SYNC_SERVER_PROGRESS, C.CString(StructToJsonString(m))) } -func (c ConversationCallback) OnSyncServerFinish() { - C.Call_CB_I_S(c.cCallback, SYNC_SERVER_FINISH, NO_DATA) +func (c ConversationCallback) OnSyncServerFinish(reinstalled bool) { + m := make(map[string]any) + m["reinstalled"] = reinstalled + C.Call_CB_I_S(c.cCallback, SYNC_SERVER_FINISH, C.CString(StructToJsonString(m))) } -func (c ConversationCallback) OnSyncServerFailed() { - C.Call_CB_I_S(c.cCallback, SYNC_SERVER_FAILED, NO_DATA) +func (c ConversationCallback) OnSyncServerFailed(reinstalled bool) { + m := make(map[string]any) + m["reinstalled"] = reinstalled + C.Call_CB_I_S(c.cCallback, SYNC_SERVER_FAILED, C.CString(StructToJsonString(m))) } func (c ConversationCallback) OnNewConversation(conversationList string) {