diff --git a/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt b/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt index 08632477..1d85e864 100644 --- a/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt +++ b/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt @@ -25,7 +25,9 @@ abstract class PowerSyncBackendConnector { * These credentials may have expired already. */ suspend fun getCredentialsCached(): PowerSyncCredentials? { - cachedCredentials?.let { return it } + if (cachedCredentials != null) { + return cachedCredentials + } return prefetchCredentials() } @@ -51,6 +53,7 @@ abstract class PowerSyncBackendConnector { fetchRequest = fetchRequest ?: GlobalScope.async { fetchCredentials().also { value -> cachedCredentials = value + fetchRequest = null } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index a6014a6d..62426950 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -17,8 +17,10 @@ import io.ktor.client.request.get import io.ktor.client.request.headers import io.ktor.client.request.preparePost import io.ktor.client.request.setBody +import io.ktor.client.statement.bodyAsText import io.ktor.http.ContentType import io.ktor.http.HttpHeaders +import io.ktor.http.HttpStatusCode import io.ktor.http.contentType import io.ktor.utils.io.* import kotlinx.coroutines.delay @@ -84,15 +86,21 @@ class SyncStream( connector.invalidateCredentials() invalidCredentials = false } - streamingSyncIteration() + val state = streamingSyncIteration() + if (!state.retry) { + break; + } } catch (e: Exception) { println("SyncStream::streamingSync Error: $e") invalidCredentials = true + updateStatus( + downloadError = e + ) + } finally { updateStatus( connected = false, connecting = true, downloading = false, - downloadError = e ) delay(retryDelay) } @@ -180,6 +188,14 @@ class SyncStream( } request.execute { httpResponse -> + if (httpResponse.status.value == 401) { + connector.invalidateCredentials() + } + + if (httpResponse.status != HttpStatusCode.OK) { + throw RuntimeException("Received error when connecting to sync stream: ${httpResponse.bodyAsText()}") + } + val channel: ByteReadChannel = httpResponse.body() while (!channel.isClosedForRead) { @@ -191,7 +207,7 @@ class SyncStream( } } - private suspend fun streamingSyncIteration() { + private suspend fun streamingSyncIteration(): SyncStreamState { val bucketEntries = bucketStorage.getBucketStates() val initialBuckets = mutableMapOf() @@ -211,14 +227,11 @@ class SyncStream( buckets = initialBuckets.map { (bucket, after) -> BucketRequest(bucket, after) }, ) - streamingSyncRequest(req).retryWhen { cause, attempt -> - println("SyncStream::streamingSyncIteration Error: $cause") - delay(retryDelay) - println("SyncStream::streamingSyncIteration Retrying attempt: $attempt") - true - }.collect { value -> + streamingSyncRequest(req).collect { value -> handleInstruction(value, state) } + + return state; } private suspend fun handleInstruction( @@ -368,6 +381,7 @@ class SyncStream( if (tokenExpiresIn <= 0) { // Connection would be closed automatically right after this println("Token expiring reconnect") + connector.invalidateCredentials() state.retry = true return state } diff --git a/demos/hello-powersync/iosApp/Podfile.lock b/demos/hello-powersync/iosApp/Podfile.lock index 2f512b40..8c576a1c 100644 --- a/demos/hello-powersync/iosApp/Podfile.lock +++ b/demos/hello-powersync/iosApp/Podfile.lock @@ -20,4 +20,4 @@ SPEC CHECKSUMS: PODFILE CHECKSUM: 4680f51fbb293d1385fb2467ada435cc1f16ab3d -COCOAPODS: 1.14.3 +COCOAPODS: 1.13.0