@@ -13,6 +13,7 @@ import android.util.Log
1313import com.nextcloud.talk.chat.ChatActivity
1414import com.nextcloud.talk.chat.data.ChatMessageRepository
1515import com.nextcloud.talk.chat.data.model.ChatMessage
16+ import com.nextcloud.talk.chat.domain.ChatPullResult
1617import com.nextcloud.talk.data.database.dao.ChatBlocksDao
1718import com.nextcloud.talk.data.database.dao.ChatMessagesDao
1819import com.nextcloud.talk.data.database.mappers.asEntity
@@ -25,14 +26,11 @@ import com.nextcloud.talk.data.user.model.User
2526import com.nextcloud.talk.extensions.toIntOrZero
2627import com.nextcloud.talk.models.domain.ConversationModel
2728import com.nextcloud.talk.models.json.chat.ChatMessageJson
28- import com.nextcloud.talk.models.json.chat.ChatOverall
2929import com.nextcloud.talk.models.json.chat.ChatOverallSingleMessage
3030import com.nextcloud.talk.models.json.converters.EnumActorTypeConverter
3131import com.nextcloud.talk.models.json.participants.Participant
3232import com.nextcloud.talk.utils.bundle.BundleKeys
3333import com.nextcloud.talk.utils.message.SendMessageUtils
34- import io.reactivex.android.schedulers.AndroidSchedulers
35- import io.reactivex.schedulers.Schedulers
3634import kotlinx.coroutines.CoroutineScope
3735import kotlinx.coroutines.Dispatchers
3836import kotlinx.coroutines.Job
@@ -44,9 +42,11 @@ import kotlinx.coroutines.flow.catch
4442import kotlinx.coroutines.flow.first
4543import kotlinx.coroutines.flow.firstOrNull
4644import kotlinx.coroutines.flow.flow
45+ import kotlinx.coroutines.flow.flowOn
4746import kotlinx.coroutines.flow.map
4847import kotlinx.coroutines.isActive
4948import kotlinx.coroutines.launch
49+ import retrofit2.HttpException
5050import java.io.IOException
5151import javax.inject.Inject
5252import kotlin.collections.any
@@ -540,124 +540,151 @@ class OfflineFirstChatRepository @Inject constructor(
540540 ).map(ChatMessageEntity ::asModel)
541541 }
542542
543- @Suppress(" UNCHECKED_CAST" , " MagicNumber" , " Detekt.TooGenericExceptionCaught" )
544- private fun getMessagesFromServer (bundle : Bundle ): Pair <Int , List <ChatMessageJson >>? {
543+ fun pullMessagesFlow (bundle : Bundle ): Flow <ChatPullResult > = flow {
545544 val fieldMap = bundle.getSerializable(BundleKeys .KEY_FIELD_MAP ) as HashMap <String , Int >
546-
547545 var attempts = 1
546+
548547 while (attempts < 5 ) {
549- Log .d(TAG , " message limit: " + fieldMap[" limit" ])
550- try {
551- val result = network.pullChatMessages(credentials, urlForChatting, fieldMap)
552- .subscribeOn(Schedulers .io())
553- .observeOn(AndroidSchedulers .mainThread())
554- .map { it ->
555- when (it.code()) {
556- HTTP_CODE_OK -> {
557- Log .d(TAG , " getMessagesFromServer HTTP_CODE_OK" )
558- newXChatLastCommonRead = it.headers()[" X-Chat-Last-Common-Read" ]?.let {
559- Integer .parseInt(it)
560- }
561-
562- return @map Pair (
563- HTTP_CODE_OK ,
564- (it.body() as ChatOverall ).ocs!! .data!!
565- )
566- }
567-
568- HTTP_CODE_NOT_MODIFIED -> {
569- Log .d(TAG , " getMessagesFromServer HTTP_CODE_NOT_MODIFIED" )
570-
571- return @map Pair (
572- HTTP_CODE_NOT_MODIFIED ,
573- listOf<ChatMessageJson >()
574- )
575- }
576-
577- HTTP_CODE_PRECONDITION_FAILED -> {
578- Log .d(TAG , " getMessagesFromServer HTTP_CODE_PRECONDITION_FAILED" )
579-
580- return @map Pair (
581- HTTP_CODE_PRECONDITION_FAILED ,
582- listOf<ChatMessageJson >()
583- )
584- }
585-
586- else -> {
587- return @map Pair (
588- HTTP_CODE_PRECONDITION_FAILED ,
589- listOf<ChatMessageJson >()
590- )
591- }
592- }
548+ runCatching {
549+ network.pullChatMessages(credentials, urlForChatting, fieldMap)
550+ }.fold(
551+ onSuccess = { response ->
552+ val result = when (response.code()) {
553+ HTTP_CODE_OK -> ChatPullResult .Success (
554+ messages = response.body()?.ocs?.data.orEmpty(),
555+ lastCommonRead = response.headers()[" X-Chat-Last-Common-Read" ]?.toInt()
556+ )
557+ HTTP_CODE_NOT_MODIFIED -> ChatPullResult .NotModified
558+ HTTP_CODE_PRECONDITION_FAILED -> ChatPullResult .PreconditionFailed
559+ else -> ChatPullResult .Error (HttpException (response))
593560 }
594- .blockingSingle()
595- return result
596- } catch (e: Exception ) {
597- Log .e(TAG , " Something went wrong when pulling chat messages (attempt: $attempts )" , e)
598- attempts++
599561
600- val newMessageLimit = when (attempts) {
601- 2 -> 50
602- 3 -> 10
603- else -> 5
562+ emit(result)
563+ return @flow
564+ },
565+ onFailure = { e ->
566+ Log .e(TAG , " Attempt $attempts failed" , e)
567+ attempts++
568+ fieldMap[" limit" ] = when (attempts) { 2 -> 50 ; 3 -> 10 ; else -> 5 }
604569 }
605- fieldMap[" limit" ] = newMessageLimit
606- }
570+ )
607571 }
608- Log .e(TAG , " All attempts to get messages from server failed" )
609- return null
610- }
572+
573+ emit(ChatPullResult .Error (IllegalStateException (" All attempts failed" )))
574+ }.flowOn(Dispatchers .IO )
575+
576+
577+
578+ // private suspend fun getMessages(bundle: Bundle): List<ChatMessageEntity>? {
579+ // if (!networkMonitor.isOnline.value) {
580+ // Log.d(TAG, "Device is offline, can't load chat messages from server")
581+ // return null
582+ // }
583+ //
584+ // val result = pullMessagesFlow(bundle)
585+ // if (result == null) {
586+ // Log.d(TAG, "No result from server")
587+ // return null
588+ // }
589+ //
590+ // var chatMessagesFromSync: List<ChatMessageEntity>? = null
591+ //
592+ // val fieldMap = bundle.getSerializable(BundleKeys.KEY_FIELD_MAP) as HashMap<String, Int>
593+ // val queriedMessageId = fieldMap["lastKnownMessageId"]
594+ // val lookIntoFuture = fieldMap["lookIntoFuture"] == 1
595+ //
596+ // val statusCode = result.first
597+ //
598+ // val hasHistory = getHasHistory(statusCode, lookIntoFuture)
599+ //
600+ // Log.d(
601+ // TAG,
602+ // "internalConv=$internalConversationId statusCode=$statusCode lookIntoFuture=$lookIntoFuture " +
603+ // "hasHistory=$hasHistory " +
604+ // "queriedMessageId=$queriedMessageId"
605+ // )
606+ //
607+ // val blockContainingQueriedMessage: ChatBlockEntity? = getBlockOfMessage(queriedMessageId)
608+ //
609+ // if (blockContainingQueriedMessage != null && !hasHistory) {
610+ // blockContainingQueriedMessage.hasHistory = false
611+ // chatBlocksDao.upsertChatBlock(blockContainingQueriedMessage)
612+ // Log.d(TAG, "End of chat was reached so hasHistory=false is set")
613+ // }
614+ //
615+ // if (result.second.isNotEmpty()) {
616+ // chatMessagesFromSync = updateMessagesData(
617+ // result.second,
618+ // blockContainingQueriedMessage,
619+ // lookIntoFuture,
620+ // hasHistory
621+ // )
622+ // } else {
623+ // Log.d(TAG, "no data is updated...")
624+ // }
625+ //
626+ // return chatMessagesFromSync
627+ // }
611628
612629 private suspend fun getMessages (bundle : Bundle ): List <ChatMessageEntity >? {
613630 if (! networkMonitor.isOnline.value) {
614631 Log .d(TAG , " Device is offline, can't load chat messages from server" )
615632 return null
616633 }
617634
618- val result = getMessagesFromServer(bundle)
619- if (result == null ) {
620- Log .d(TAG , " No result from server" )
621- return null
622- }
623-
624- var chatMessagesFromSync: List <ChatMessageEntity >? = null
625-
626635 val fieldMap = bundle.getSerializable(BundleKeys .KEY_FIELD_MAP ) as HashMap <String , Int >
627636 val queriedMessageId = fieldMap[" lastKnownMessageId" ]
628637 val lookIntoFuture = fieldMap[" lookIntoFuture" ] == 1
629638
630- val statusCode = result .first
639+ val result = pullMessagesFlow(bundle) .first()
631640
632- val hasHistory = getHasHistory(statusCode, lookIntoFuture)
641+ return when (result) {
633642
634- Log .d(
635- TAG ,
636- " internalConv=$internalConversationId statusCode=$statusCode lookIntoFuture=$lookIntoFuture " +
637- " hasHistory=$hasHistory " +
638- " queriedMessageId=$queriedMessageId "
639- )
643+ is ChatPullResult .Success -> {
644+ val hasHistory = getHasHistory(HTTP_CODE_OK , lookIntoFuture)
640645
641- val blockContainingQueriedMessage: ChatBlockEntity ? = getBlockOfMessage(queriedMessageId)
646+ Log .d(
647+ TAG ,
648+ " internalConv=$internalConversationId statusCode=${HTTP_CODE_OK } lookIntoFuture=$lookIntoFuture " +
649+ " hasHistory=$hasHistory queriedMessageId=$queriedMessageId "
650+ )
642651
643- if (blockContainingQueriedMessage != null && ! hasHistory) {
644- blockContainingQueriedMessage.hasHistory = false
645- chatBlocksDao.upsertChatBlock(blockContainingQueriedMessage)
646- Log .d(TAG , " End of chat was reached so hasHistory=false is set" )
647- }
652+ val blockContainingQueriedMessage: ChatBlockEntity ? = getBlockOfMessage(queriedMessageId)
648653
649- if (result.second.isNotEmpty()) {
650- chatMessagesFromSync = updateMessagesData(
651- result.second,
652- blockContainingQueriedMessage,
653- lookIntoFuture,
654- hasHistory
655- )
656- } else {
657- Log .d(TAG , " no data is updated..." )
658- }
654+ blockContainingQueriedMessage?.takeIf { ! hasHistory }?.apply {
655+ this .hasHistory = false
656+ chatBlocksDao.upsertChatBlock(this )
657+ Log .d(TAG , " End of chat reached, set hasHistory=false" )
658+ }
659+
660+ if (result.messages.isNotEmpty()) {
661+ updateMessagesData(
662+ result.messages,
663+ blockContainingQueriedMessage,
664+ lookIntoFuture,
665+ hasHistory
666+ )
667+ } else {
668+ Log .d(TAG , " No new messages to update" )
669+ null
670+ }
671+ }
659672
660- return chatMessagesFromSync
673+ is ChatPullResult .NotModified -> {
674+ Log .d(TAG , " Server returned NOT_MODIFIED, nothing to update" )
675+ null
676+ }
677+
678+ is ChatPullResult .PreconditionFailed -> {
679+ Log .d(TAG , " Server returned PRECONDITION_FAILED, nothing to update" )
680+ null
681+ }
682+
683+ is ChatPullResult .Error -> {
684+ Log .e(TAG , " Error pulling messages from server" , result.throwable)
685+ null
686+ }
687+ }
661688 }
662689
663690 private suspend fun OfflineFirstChatRepository.updateMessagesData (
0 commit comments