// Copyright © 2023 OpenIM SDK. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package interaction import ( "context" "strings" "sync" "time" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "github.com/openimsdk/openim-sdk-core/v3/pkg/common" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface" "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" ) const ( connectPullNums = 1 defaultPullNums = 10 SplitPullMsgNum = 100 ) // The callback synchronization starts. The reconnection ends type MsgSyncer struct { loginUserID string // login user ID longConnMgr *LongConnMgr // long connection manager PushMsgAndMaxSeqCh chan common.Cmd2Value // channel for receiving push messages and the maximum SEQ number conversationCh chan common.Cmd2Value // storage and session triggering syncedMaxSeqs map[string]int64 // map of the maximum synced SEQ numbers for all group IDs db db_interface.DataBase // data store syncTimes int // times of sync ctx context.Context // context reinstalled bool //true if the app was uninstalled and reinstalled } // NewMsgSyncer creates a new instance of the message synchronizer. func NewMsgSyncer(ctx context.Context, conversationCh, PushMsgAndMaxSeqCh chan common.Cmd2Value, loginUserID string, longConnMgr *LongConnMgr, db db_interface.DataBase, syncTimes int) (*MsgSyncer, error) { m := &MsgSyncer{ loginUserID: loginUserID, longConnMgr: longConnMgr, PushMsgAndMaxSeqCh: PushMsgAndMaxSeqCh, conversationCh: conversationCh, ctx: ctx, syncedMaxSeqs: make(map[string]int64), db: db, syncTimes: syncTimes, } if err := m.loadSeq(ctx); err != nil { log.ZError(ctx, "loadSeq err", err) return nil, err } return m, nil } // seq The db reads the data to the memory,set syncedMaxSeqs func (m *MsgSyncer) loadSeq(ctx context.Context) error { conversationIDList, err := m.db.GetAllConversationIDList(ctx) if err != nil { log.ZError(ctx, "get conversation id list failed", err) return err } if len(conversationIDList) == 0 { m.reinstalled = true } // TODO With a large number of sessions, this could potentially cause blocking and needs optimization. type SyncedSeq struct { ConversationID string MaxSyncedSeq int64 Err error } concurrency := 10 t2 := time.Now() SyncedSeqs := make(chan SyncedSeq, len(conversationIDList)) sem := make(chan struct{}, concurrency) var wg sync.WaitGroup for _, v := range conversationIDList { wg.Add(1) sem <- struct{}{} // Acquire a token go func(conversationID string) { defer wg.Done() defer func() { <-sem }() // Release the token maxSyncedSeq, err := m.db.GetConversationNormalMsgSeq(ctx, conversationID) SyncedSeqs <- SyncedSeq{ ConversationID: conversationID, MaxSyncedSeq: maxSyncedSeq, Err: err, } }(v) log.ZDebug(ctx, "goroutine done.", "goroutine cost time", time.Since(t2)) } // Close the results channel once all goroutines have finished go func() { wg.Wait() close(SyncedSeqs) }() // Collect the results for res := range SyncedSeqs { if res.Err != nil { log.ZError(ctx, "get group normal seq failed", res.Err, "conversationID", res.ConversationID) } else { m.syncedMaxSeqs[res.ConversationID] = res.MaxSyncedSeq } } notificationSeqs, err := m.db.GetNotificationAllSeqs(ctx) if err != nil { log.ZError(ctx, "get notification seq failed", err) return err } for _, notificationSeq := range notificationSeqs { m.syncedMaxSeqs[notificationSeq.ConversationID] = notificationSeq.Seq } log.ZDebug(ctx, "loadSeq", "syncedMaxSeqs", m.syncedMaxSeqs) return nil } // DoListener Listen to the message pipe of the message synchronizer // and process received and pushed messages func (m *MsgSyncer) DoListener(ctx context.Context) { for { select { case cmd := <-m.PushMsgAndMaxSeqCh: m.handlePushMsgAndEvent(cmd) case <-ctx.Done(): log.ZInfo(m.ctx, "msg syncer done, sdk logout.....") return } } } // get seqs need sync interval func (m *MsgSyncer) getSeqsNeedSync(syncedMaxSeq, maxSeq int64) []int64 { var seqs []int64 for i := syncedMaxSeq + 1; i <= maxSeq; i++ { seqs = append(seqs, i) } return seqs } // recv msg from func (m *MsgSyncer) handlePushMsgAndEvent(cmd common.Cmd2Value) { switch cmd.Cmd { case constant.CmdConnSuccesss: log.ZInfo(cmd.Ctx, "recv long conn mgr connected", "cmd", cmd.Cmd, "value", cmd.Value) m.doConnected(cmd.Ctx) case constant.CmdMaxSeq: log.ZInfo(cmd.Ctx, "recv max seqs from long conn mgr, start sync msgs", "cmd", cmd.Cmd, "value", cmd.Value) m.compareSeqsAndBatchSync(cmd.Ctx, cmd.Value.(*sdk_struct.CmdMaxSeqToMsgSync).ConversationMaxSeqOnSvr, defaultPullNums) case constant.CmdPushMsg: m.doPushMsg(cmd.Ctx, cmd.Value.(*sdkws.PushMessages)) } } func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync map[string]int64, pullNums int64) { needSyncSeqMap := make(map[string][2]int64) //when app reinstalled do not pull notifications messages. if m.reinstalled { notificationsSeqMap := make(map[string]int64) messagesSeqMap := make(map[string]int64) for conversationID, seq := range maxSeqToSync { if IsNotification(conversationID) { notificationsSeqMap[conversationID] = seq } else { messagesSeqMap[conversationID] = seq } } var notificationSeqs []*model_struct.NotificationSeqs for conversationID, seq := range notificationsSeqMap { notificationSeqs = append(notificationSeqs, &model_struct.NotificationSeqs{ ConversationID: conversationID, Seq: seq, }) m.syncedMaxSeqs[conversationID] = seq } err := m.db.BatchInsertNotificationSeq(ctx, notificationSeqs) if err != nil { log.ZWarn(ctx, "BatchInsertNotificationSeq err", err) } //for conversationID, seq := range notificationsSeqMap { // err := m.db.SetNotificationSeq(ctx, conversationID, seq) // if err != nil { // log.ZWarn(ctx, "SetNotificationSeq err", err, "conversationID", conversationID, "seq", seq) // continue // } else { // m.syncedMaxSeqs[conversationID] = seq // } //} for conversationID, maxSeq := range messagesSeqMap { if syncedMaxSeq, ok := m.syncedMaxSeqs[conversationID]; ok { if maxSeq > syncedMaxSeq { needSyncSeqMap[conversationID] = [2]int64{syncedMaxSeq + 1, maxSeq} } } else { needSyncSeqMap[conversationID] = [2]int64{0, maxSeq} } } m.reinstalled = false } else { for conversationID, maxSeq := range maxSeqToSync { if syncedMaxSeq, ok := m.syncedMaxSeqs[conversationID]; ok { if maxSeq > syncedMaxSeq { needSyncSeqMap[conversationID] = [2]int64{syncedMaxSeq + 1, maxSeq} } } else { needSyncSeqMap[conversationID] = [2]int64{0, maxSeq} } } } _ = m.syncAndTriggerMsgs(m.ctx, needSyncSeqMap, pullNums) } func (m *MsgSyncer) doPushMsg(ctx context.Context, push *sdkws.PushMessages) { log.ZDebug(ctx, "push msgs", "push", push, "syncedMaxSeqs", m.syncedMaxSeqs) m.pushTriggerAndSync(ctx, push.Msgs, m.triggerConversation) m.pushTriggerAndSync(ctx, push.NotificationMsgs, m.triggerNotification) } func (m *MsgSyncer) pushTriggerAndSync(ctx context.Context, pullMsgs map[string]*sdkws.PullMsgs, triggerFunc func(ctx context.Context, msgs map[string]*sdkws.PullMsgs) error) { if len(pullMsgs) == 0 { return } needSyncSeqMap := make(map[string][2]int64) var lastSeq int64 var storageMsgs []*sdkws.MsgData for conversationID, msgs := range pullMsgs { for _, msg := range msgs.Msgs { if msg.Seq == 0 { _ = triggerFunc(ctx, map[string]*sdkws.PullMsgs{conversationID: {Msgs: []*sdkws.MsgData{msg}}}) continue } lastSeq = msg.Seq storageMsgs = append(storageMsgs, msg) } if lastSeq == m.syncedMaxSeqs[conversationID]+int64(len(storageMsgs)) && lastSeq != 0 { log.ZDebug(ctx, "trigger msgs", "msgs", storageMsgs) _ = triggerFunc(ctx, map[string]*sdkws.PullMsgs{conversationID: {Msgs: storageMsgs}}) m.syncedMaxSeqs[conversationID] = lastSeq } else if lastSeq != 0 && lastSeq > m.syncedMaxSeqs[conversationID] { //must pull message when message type is notification needSyncSeqMap[conversationID] = [2]int64{m.syncedMaxSeqs[conversationID] + 1, lastSeq} } } m.syncAndTriggerMsgs(ctx, needSyncSeqMap, defaultPullNums) } // Called after successful reconnection to synchronize the latest message func (m *MsgSyncer) doConnected(ctx context.Context) { reinstalled := m.reinstalled if reinstalled { common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.AppDataSyncStart}, m.conversationCh) } else { common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncBegin}, m.conversationCh) } var resp sdkws.GetMaxSeqResp if err := m.longConnMgr.SendReqWaitResp(m.ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil { log.ZError(m.ctx, "get max seq error", err) common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncFailed}, m.conversationCh) return } else { log.ZDebug(m.ctx, "get max seq success", "resp", resp.MaxSeqs) } m.compareSeqsAndBatchSync(ctx, resp.MaxSeqs, connectPullNums) if reinstalled { common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.AppDataSyncFinish}, m.conversationCh) } else { common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncEnd}, m.conversationCh) } } func IsNotification(conversationID string) bool { return strings.HasPrefix(conversationID, "n_") } // Fragment synchronization message, seq refresh after successful trigger func (m *MsgSyncer) syncAndTriggerMsgs(ctx context.Context, seqMap map[string][2]int64, syncMsgNum int64) error { if len(seqMap) > 0 { log.ZDebug(ctx, "current sync seqMap", "seqMap", seqMap) tempSeqMap := make(map[string][2]int64, 50) msgNum := 0 for k, v := range seqMap { oneConversationSyncNum := v[1] - v[0] + 1 if (oneConversationSyncNum/SplitPullMsgNum) > 1 && IsNotification(k) { nSeqMap := make(map[string][2]int64, 1) count := int(oneConversationSyncNum / SplitPullMsgNum) startSeq := v[0] var end int64 for i := 0; i <= count; i++ { if i == count { nSeqMap[k] = [2]int64{startSeq, v[1]} } else { end = startSeq + int64(SplitPullMsgNum) if end > v[1] { end = v[1] i = count } nSeqMap[k] = [2]int64{startSeq, end} } resp, err := m.pullMsgBySeqRange(ctx, nSeqMap, syncMsgNum) if err != nil { log.ZError(ctx, "syncMsgFromSvr err", err, "nSeqMap", nSeqMap) return err } _ = m.triggerConversation(ctx, resp.Msgs) _ = m.triggerNotification(ctx, resp.NotificationMsgs) for conversationID, seqs := range nSeqMap { m.syncedMaxSeqs[conversationID] = seqs[1] } startSeq = end + 1 } continue } tempSeqMap[k] = v if oneConversationSyncNum > 0 { msgNum += int(oneConversationSyncNum) } if msgNum >= SplitPullMsgNum { resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum) if err != nil { log.ZError(ctx, "syncMsgFromSvr err", err, "tempSeqMap", tempSeqMap) return err } _ = m.triggerConversation(ctx, resp.Msgs) _ = m.triggerNotification(ctx, resp.NotificationMsgs) for conversationID, seqs := range tempSeqMap { m.syncedMaxSeqs[conversationID] = seqs[1] } tempSeqMap = make(map[string][2]int64, 50) msgNum = 0 } } resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum) if err != nil { log.ZError(ctx, "syncMsgFromSvr err", err, "seqMap", seqMap) return err } _ = m.triggerConversation(ctx, resp.Msgs) _ = m.triggerNotification(ctx, resp.NotificationMsgs) for conversationID, seqs := range seqMap { m.syncedMaxSeqs[conversationID] = seqs[1] } } else { log.ZDebug(ctx, "noting conversation to sync", "syncMsgNum", syncMsgNum) } return nil } func (m *MsgSyncer) splitSeqs(split int, seqsNeedSync []int64) (splitSeqs [][]int64) { if len(seqsNeedSync) <= split { splitSeqs = append(splitSeqs, seqsNeedSync) return } for i := 0; i < len(seqsNeedSync); i += split { end := i + split if end > len(seqsNeedSync) { end = len(seqsNeedSync) } splitSeqs = append(splitSeqs, seqsNeedSync[i:end]) } return } func (m *MsgSyncer) pullMsgBySeqRange(ctx context.Context, seqMap map[string][2]int64, syncMsgNum int64) (resp *sdkws.PullMessageBySeqsResp, err error) { log.ZDebug(ctx, "pullMsgBySeqRange", "seqMap", seqMap, "syncMsgNum", syncMsgNum) req := sdkws.PullMessageBySeqsReq{UserID: m.loginUserID} for conversationID, seqs := range seqMap { req.SeqRanges = append(req.SeqRanges, &sdkws.SeqRange{ ConversationID: conversationID, Begin: seqs[0], End: seqs[1], Num: syncMsgNum, }) } resp = &sdkws.PullMessageBySeqsResp{} if err := m.longConnMgr.SendReqWaitResp(ctx, &req, constant.PullMsgBySeqList, resp); err != nil { return nil, err } return resp, nil } // synchronizes messages by SEQs. func (m *MsgSyncer) syncMsgBySeqs(ctx context.Context, conversationID string, seqsNeedSync []int64) (allMsgs []*sdkws.MsgData, err error) { pullMsgReq := sdkws.PullMessageBySeqsReq{} pullMsgReq.UserID = m.loginUserID split := constant.SplitPullMsgNum seqsList := m.splitSeqs(split, seqsNeedSync) for i := 0; i < len(seqsList); { var pullMsgResp sdkws.PullMessageBySeqsResp err := m.longConnMgr.SendReqWaitResp(ctx, &pullMsgReq, constant.PullMsgBySeqList, &pullMsgResp) if err != nil { log.ZError(ctx, "syncMsgFromSvrSplit err", err, "pullMsgReq", pullMsgReq) continue } i++ allMsgs = append(allMsgs, pullMsgResp.Msgs[conversationID].Msgs...) } return allMsgs, nil } // triggers a conversation with a new message. func (m *MsgSyncer) triggerConversation(ctx context.Context, msgs map[string]*sdkws.PullMsgs) error { if len(msgs) > 0 { err := common.TriggerCmdNewMsgCome(ctx, sdk_struct.CmdNewMsgComeToConversation{Msgs: msgs}, m.conversationCh) if err != nil { log.ZError(ctx, "triggerCmdNewMsgCome err", err, "msgs", msgs) } log.ZDebug(ctx, "triggerConversation", "msgs", msgs) return err } else { log.ZDebug(ctx, "triggerConversation is nil", "msgs", msgs) } return nil } func (m *MsgSyncer) triggerNotification(ctx context.Context, msgs map[string]*sdkws.PullMsgs) error { if len(msgs) > 0 { common.TriggerCmdNotification(ctx, sdk_struct.CmdNewMsgComeToConversation{Msgs: msgs}, m.conversationCh) } else { log.ZDebug(ctx, "triggerNotification is nil", "msgs", msgs) } return nil }